Skip to content

Commit

Permalink
feat(restore): filter restored manifests by dc
Browse files Browse the repository at this point in the history
Fixes #3829
  • Loading branch information
Michal-Leszczynski committed Oct 8, 2024
1 parent a45aaad commit 8ef0981
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 7 deletions.
21 changes: 15 additions & 6 deletions pkg/service/restore/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
)

// LocationWorkload represents aggregated restore workload
Expand Down Expand Up @@ -56,10 +57,10 @@ type SSTable struct {
}

// IndexWorkload returns sstables to be restored aggregated by location, table and remote sstable dir.
func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) ([]LocationWorkload, error) {
func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location, dcFilters []string) ([]LocationWorkload, error) {
var workload []LocationWorkload
for _, l := range locations {
lw, err := w.indexLocationWorkload(ctx, l)
lw, err := w.indexLocationWorkload(ctx, l, dcFilters)
if err != nil {
return nil, errors.Wrapf(err, "index workload in %s", l)
}
Expand All @@ -68,8 +69,8 @@ func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location)
return workload, nil
}

func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location) (LocationWorkload, error) {
rawWorkload, err := w.createRemoteDirWorkloads(ctx, location)
func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location, dcFilters []string) (LocationWorkload, error) {
rawWorkload, err := w.createRemoteDirWorkloads(ctx, location, dcFilters)
if err != nil {
return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads")
}
Expand All @@ -84,9 +85,17 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat
return workload, nil
}

func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) {
func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location, dcFilters []string) ([]RemoteDirWorkload, error) {
dcs, err := dcfilter.NewFilter(dcFilters)
if err != nil {
return nil, errors.Wrapf(err, "create dc filter")
}

var rawWorkload []RemoteDirWorkload
err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error {
err = w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error {
if !dcs.Check(m.DC) {
return nil
}
return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error {
if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) {
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type Target struct {
Location []Location `json:"location"`
Keyspace []string `json:"keyspace,omitempty"`
Datacenter []string `json:"datacenter,omitempty"`
SnapshotTag string `json:"snapshot_tag"`
BatchSize int `json:"batch_size,omitempty"`
Parallel int `json:"parallel,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions pkg/service/restore/schema_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"go.uber.org/atomic"

Expand Down Expand Up @@ -205,7 +206,16 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc
return w.workFunc(ctx, fm)
}

dcs, err := dcfilter.NewFilter(w.target.Datacenter)
if err != nil {
return errors.Wrapf(err, "create dc filter")
}

manifestDownloadHandler := func(miwc ManifestInfoWithContent) error {
if !dcs.Check(miwc.DC) {
return nil
}

w.logger.Info(ctx, "Downloading schema from manifest", "manifest", miwc.ManifestInfo)
defer w.logger.Info(ctx, "Downloading schema from manifest finished", "manifest", miwc.ManifestInfo)

Expand Down
2 changes: 1 addition & 1 deletion pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
w.logger.Info(ctx, "Started restoring tables")
defer w.logger.Info(ctx, "Restoring tables finished")

workload, err := w.IndexWorkload(ctx, w.target.Location)
workload, err := w.IndexWorkload(ctx, w.target.Location, w.target.Datacenter)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/service/restore/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/retry"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
Expand Down Expand Up @@ -336,9 +337,18 @@ func (w *worker) initUnits(ctx context.Context) error {
unitMap = make(map[string]Unit)
)

dcs, err := dcfilter.NewFilter(w.target.Datacenter)
if err != nil {
return errors.Wrapf(err, "create dc filter")
}

var foundManifest bool
for _, l := range w.target.Location {
manifestHandler := func(miwc ManifestInfoWithContent) error {
if !dcs.Check(miwc.DC) {
return nil
}

foundManifest = true

filesHandler := func(fm FilesMeta) {
Expand Down

0 comments on commit 8ef0981

Please sign in to comment.