Skip to content

Commit

Permalink
feat(restore): uses dc mappings for restoring tables
Browse files Browse the repository at this point in the history
This introduces use of dc mappings when restoring tables.
Now each dc is downloading only data from corresponding dc(s)
accordingly to user provided mapping.
Also some dcs can be explicitly ignored.

Fixes: #3829
  • Loading branch information
VAveryanov8 committed Jan 16, 2025
1 parent e3d9b34 commit 532ad04
Show file tree
Hide file tree
Showing 8 changed files with 608 additions and 15 deletions.
15 changes: 9 additions & 6 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ type batchDispatcher struct {
hostShardCnt map[string]uint
}

func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher {
func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string, hostDCs map[string][]string) *batchDispatcher {
sortWorkload(workload)
var shards uint
for _, sh := range hostShardCnt {
Expand All @@ -70,7 +71,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin
mu: sync.Mutex{},
wait: make(chan struct{}),
workload: workload,
workloadProgress: newWorkloadProgress(workload, locationHosts),
workloadProgress: newWorkloadProgress(workload, locationHosts, hostDCs),
batchSize: batchSize,
expectedShardWorkload: workload.TotalSize / int64(shards),
hostShardCnt: hostShardCnt,
Expand Down Expand Up @@ -106,7 +107,7 @@ type remoteSSTableDirProgress struct {
RemainingSSTables []RemoteSSTable
}

func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress {
func newWorkloadProgress(workload Workload, locationHosts map[Location][]string, hostDCs map[string][]string) workloadProgress {
dcBytes := make(map[string]int64)
locationDC := make(map[string][]string)
p := make([]remoteSSTableDirProgress, len(workload.RemoteDir))
Expand All @@ -121,7 +122,9 @@ func newWorkloadProgress(workload Workload, locationHosts map[Location][]string)
hostDCAccess := make(map[string][]string)
for loc, hosts := range locationHosts {
for _, h := range hosts {
hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...)
dcsInLoc := locationDC[loc.StringWithoutDC()]
hostAllDCs := hostDCs[h]
hostDCAccess[h] = append(hostDCAccess[h], strset.Intersection(strset.New(dcsInLoc...), strset.New(hostAllDCs...)).List()...)
}
}
return workloadProgress{
Expand Down Expand Up @@ -201,8 +204,8 @@ func (bd *batchDispatcher) ValidateAllDispatched() error {
for i, rdp := range bd.workloadProgress.remoteDir {
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 {
rdw := bd.workload.RemoteDir[i]
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size)
return errors.Errorf("failed to restore sstables from location %s dc %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.DC, rdw.Keyspace, rdw.Table, rdw.Size)
}
}
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored {
Expand Down
169 changes: 168 additions & 1 deletion pkg/service/restore/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package restore
import (
"testing"

"github.com/google/go-cmp/cmp"

"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

Expand Down Expand Up @@ -114,7 +116,13 @@ func TestBatchDispatcher(t *testing.T) {
"h3": 3,
}

bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts)
hostDCs := map[string][]string{
"h1": {"dc1", "dc2"},
"h2": {"dc1", "dc2"},
"h3": {"dc3"},
}

bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts, hostDCs)

scenario := []struct {
host string
Expand Down Expand Up @@ -166,3 +174,162 @@ func TestBatchDispatcher(t *testing.T) {
t.Fatalf("Expected sstables to be batched: %s", err)
}
}

func TestNewWorkloadProgress(t *testing.T) {
testCases := []struct {
name string

workload Workload
locationHosts map[backupspec.Location][]string
hostDCs map[string][]string

expected map[string][]string
}{
{
name: "one location with one DC",
workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1"}}),
locationHosts: map[backupspec.Location][]string{
{}: {"host1", "host2"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
},
},
{
name: "one location with two DC's",
workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1", "dc2"}}),
locationHosts: map[backupspec.Location][]string{
{}: {"host1", "host2"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},
},
{
name: "one location with two DC's, more nodes",
workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1", "dc2"}}),
locationHosts: map[backupspec.Location][]string{
{}: {"host1", "host2", "host3", "host4"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
"host3": {"dc2"},
"host4": {"dc2"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
"host3": {"dc2"},
"host4": {"dc2"},
},
},
{
name: "two locations with one DC each",
workload: generateWorkload(t,
[]string{"location1", "location2"},
map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}},
),
locationHosts: map[backupspec.Location][]string{
{Path: "location1"}: {"host1", "host2"},
{Path: "location2"}: {"host1", "host2"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},
},
{
name: "two locations with one DC each, but some hosts doesn't have access",
workload: generateWorkload(t,
[]string{"location1", "location2"},
map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}},
),
locationHosts: map[backupspec.Location][]string{
{Path: "location1"}: {"host1", "host3", "host4"},
{Path: "location2"}: {"host1", "host2", "host4"},
},
hostDCs: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
"host3": {"dc2"},
"host4": {"dc2"},
},

expected: map[string][]string{
"host1": {"dc1"},
"host2": nil,
"host3": nil,
"host4": {"dc2"},
},
},
{
name: "two locations with one DC each, but hosts maps to all dcs",
workload: generateWorkload(t,
[]string{"location1", "location2"},
map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}},
),
locationHosts: map[backupspec.Location][]string{
{Path: "location1"}: {"host1", "host2"},
{Path: "location2"}: {"host1", "host2"},
},
hostDCs: map[string][]string{
"host1": {"dc1", "dc2"},
"host2": {"dc1", "dc2"},
},

expected: map[string][]string{
"host1": {"dc1", "dc2"},
"host2": {"dc1", "dc2"},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
wp := newWorkloadProgress(tc.workload, tc.locationHosts, tc.hostDCs)
if diff := cmp.Diff(wp.hostDCAccess, tc.expected); diff != "" {
t.Fatalf("Actual != Expected: %s", diff)
}
})
}
}

func generateWorkload(t *testing.T, locationPaths []string, dcsInLocation map[string][]string) Workload {
t.Helper()

var remoteDirs []RemoteDirWorkload
for _, path := range locationPaths {
dcs, ok := dcsInLocation[path]
if !ok {
t.Fatalf("each location should have corresponding entry in dcsInLocation map")
}
for _, dc := range dcs {
remoteDirs = append(remoteDirs, RemoteDirWorkload{
ManifestInfo: &backupspec.ManifestInfo{
DC: dc,
Location: backupspec.Location{Path: path},
},
})
}
}
return Workload{RemoteDir: remoteDirs}
}
4 changes: 4 additions & 0 deletions pkg/service/restore/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat
func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) {
var rawWorkload []RemoteDirWorkload
err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error {
if slices.Contains(w.target.ignoredSourceDC, m.DC) {
w.logger.Info(ctx, "Ignoring DC", "dc", m.DC, "location", location)
return nil
}
return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error {
if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) {
return nil
Expand Down
55 changes: 55 additions & 0 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
Expand All @@ -33,9 +34,14 @@ type Target struct {
RestoreSchema bool `json:"restore_schema,omitempty"`
RestoreTables bool `json:"restore_tables,omitempty"`
Continue bool `json:"continue"`
DCMappings DCMappings `json:"dc-mapping"`

// Cache for host with access to remote location
locationHosts map[Location][]string `json:"-"`
// Cache for host and their DC after applying DCMappings
hostDCs map[string][]string
// Cache for dcs that shouldn't be restored from location
ignoredSourceDC []string
}

const (
Expand Down Expand Up @@ -82,6 +88,16 @@ func (t Target) validateProperties(dcMap map[string][]string) error {
if t.RestoreSchema && t.Keyspace != nil {
return errors.New("restore schema always restores 'system_schema.*' tables only, no need to specify '--keyspace' flag")
}
// Check for duplicates in Location
allLocations := strset.New()
for _, l := range t.Location {
p := l.RemotePath("")
if allLocations.Has(p) {
return errors.Errorf("location %s is specified multiple times", l)
}
allLocations.Add(p)
}

return nil
}

Expand Down Expand Up @@ -289,3 +305,42 @@ type HostInfo struct {
Transfers int
RateLimit int
}

// DCMappings represents how DCs from the backup cluster are mapped to DCs in the restore cluster.
// For details about how DCs can be mapped refer to --dc-mapping documentation.
type DCMappings []DCMapping

// DCMapping represent single instance of datacenter mappings. See DCMappings for details.
type DCMapping struct {
Source []string `json:"source"`
IgnoreSource []string `json:"ignore_source"`
Target []string `json:"target"`
IgnoreTarget []string `json:"ignore_target"`
}

func (mappings DCMappings) calculateMappings() (targetMap map[string][]string, ignoreSource, ignoreTarget []string) {
targetMap = map[string][]string{}
for _, mapping := range mappings {
ignoreSource = append(ignoreSource, mapping.IgnoreSource...)
ignoreTarget = append(ignoreTarget, mapping.IgnoreTarget...)

if len(mapping.Source) == 0 || len(mapping.Target) == 0 {
continue
}
tIdx, sIdx := 0, 0
for {
target, source := mapping.Target[tIdx], mapping.Source[sIdx]
targetMap[target] = append(targetMap[target], source)
if tIdx == len(mapping.Target)-1 && sIdx == len(mapping.Source)-1 {
break
}
if tIdx < len(mapping.Target)-1 {
tIdx++
}
if sIdx < len(mapping.Source)-1 {
sIdx++
}
}
}
return targetMap, ignoreSource, ignoreTarget
}
Loading

0 comments on commit 532ad04

Please sign in to comment.