diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 6a54dd43d..fb8f06844 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -59,6 +59,10 @@ func TestRestoreTablesUserIntegration(t *testing.T) { "location": loc, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, }) Print("Log in via restored user and check permissions") @@ -106,6 +110,10 @@ func TestRestoreTablesNoReplicationIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, }) h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) @@ -296,6 +304,10 @@ func TestRestoreSchemaDropAddColumnIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, }) h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) @@ -354,6 +366,10 @@ func TestRestoreTablesVnodeToTabletsIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, }) validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab, c1, c2) @@ -455,6 +471,10 @@ func TestRestoreTablesPausedIntegration(t *testing.T) { "keyspace": []string{ks1, ks2}, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, } err = runPausedRestore(t, func(ctx context.Context) error { h.dstCluster.RunID = uuid.NewTime() @@ -625,6 +645,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "rate_limit": []string{"0"}, "unpin_agent_cpu": true, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1", "dc2"}, "target": {"dc1"}}, + }, }) if err != nil { finishedRestore <- err @@ -772,6 +796,10 @@ func TestRestoreTablesBatchRetryIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc1", "dc2"}}, + }, } t.Run("batch retry finished with success", func(t *testing.T) { @@ -971,6 +999,10 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) { "parallel": 1, "snapshot_tag": tag, "restore_tables": true, + // DC Mapping is required, because dcs are reversed :D + "dc-mapping": []map[string][]string{ + {"source": {"dc1", "dc2"}, "target": {"dc2", "dc1"}}, + }, }) close(res) }() @@ -1071,3 +1103,161 @@ func TestRestoreTablesProgressIntegration(t *testing.T) { } } } + +func TestRestoreTablesIntoClusterWithAnotherDCNameIntegration(t *testing.T) { + h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedThirdClusterHosts()) + + Print("Keyspace setup") + // Source cluster + ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}" + ks := randomizedName("multi_location_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks)) + + // Target cluster + ksStmtDst := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc3': 1}" + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ks)) + + Print("Table setup") + tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)" + tab := randomizedName("tab_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + + Print("Fill setup") + fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + + Print("Save filled table into map") + srcM := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ks, tab, "id", "data") + + Print("Run backup") + loc := []Location{ + testLocation("one-location-1", ""), + } + S3InitBucket(t, loc[0].Path) + ksFilter := []string{ks} + tag := h.runBackup(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + "batch_size": 100, + }) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + res := make(chan struct{}) + go func() { + h.runRestore(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + // Test if batching does not hang with + // limited parallel and location access. + "parallel": 1, + "snapshot_tag": tag, + "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1"}, "target": {"dc3"}}, + }, + }) + close(res) + }() + + select { + case <-res: + case <-time.NewTimer(2 * time.Minute).C: + t.Fatal("Restore hanged") + } + + Print("Save restored table into map") + dstM := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ks, tab, "id", "data") + + Print("Validate success") + if !maps.Equal(srcM, dstM) { + t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM) + } +} + +func TestRestoreOnlyOneDCFromLocation(t *testing.T) { + h := newTestHelper(t, ManagedClusterHosts(), ManagedThirdClusterHosts()) + + Print("Keyspace setup") + // Source cluster + ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}" + ksTwoDC := randomizedName("two_dc_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ksTwoDC)) + + // Keyspace thats only available in dc2 + ksStmtOneDC := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1':0, 'dc2': 1}" + ksOneDC := randomizedName("one_dc_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmtOneDC, ksOneDC)) + + // Target cluster + ksStmtDst := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc3': 1}" + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ksTwoDC)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ksOneDC)) + + Print("Table setup") + tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)" + tab := randomizedName("tab_") + for _, ks := range []string{ksTwoDC, ksOneDC} { + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + } + + Print("Fill setup") + for _, ks := range []string{ksTwoDC, ksOneDC} { + fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + } + + Print("Save filled table into map") + srcMTwoDC := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ksTwoDC, tab, "id", "data") + + Print("Run backup") + loc := []Location{ + testLocation("one-location-1", ""), + } + S3InitBucket(t, loc[0].Path) + ksFilter := []string{ksTwoDC, ksOneDC} + tag := h.runBackup(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + "batch_size": 100, + }) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + res := make(chan struct{}) + go func() { + h.runRestore(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + // Test if batching does not hang with + // limited parallel and location access. + "parallel": 1, + "snapshot_tag": tag, + "restore_tables": true, + // DC Mapping is required + "dc-mapping": []map[string][]string{ + {"source": {"dc1", "!dc2"}, "target": {"dc3"}}, + }, + }) + close(res) + }() + + select { + case <-res: + case <-time.NewTimer(2 * time.Minute).C: + t.Fatal("Restore hanged") + } + + Print("Save restored table into map") + dstMTwoDC := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ksTwoDC, tab, "id", "data") + dstMOneDC := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ksOneDC, tab, "id", "data") + + Print("Validate success") + if !maps.Equal(srcMTwoDC, dstMTwoDC) { + t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcMTwoDC, dstMTwoDC) + } + if len(dstMOneDC) != 0 { + t.Fatalf("dc2 shouldn't be restored") + } +} diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 8ad8fb0db..fb9666433 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -653,6 +653,9 @@ func TestRestoreTablesSmokeIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } smokeRestore(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser, "{'class': 'NetworkTopologyStrategy', 'dc1': 2}") @@ -718,7 +721,7 @@ func smokeRestore(t *testing.T, target Target, keyspace string, loadCnt, loadSiz Print("When: restore backup on different cluster = (dc1: 3 nodes, dc2: 3 nodes)") if err := dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)); err != nil { - t.Fatal(err) + t.Fatal("Restore:", err) } dstH.validateRestoreSuccess(dstSession, srcSession, target, []table{{ks: keyspace, tab: BigTableName}}) @@ -745,6 +748,9 @@ func TestRestoreTablesRestartAgentsIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreWithAgentRestart(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -822,6 +828,9 @@ func TestRestoreTablesResumeIntegration(t *testing.T) { Parallel: testParallel, RestoreTables: true, Continue: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreWithResume(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -848,6 +857,9 @@ func TestRestoreTablesResumeContinueFalseIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreWithResume(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -1012,6 +1024,9 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreWithVersions(t, target, testKeyspace, testLoadCnt, testLoadSize, corruptCnt, testUser) @@ -1305,6 +1320,9 @@ func TestRestoreTablesViewCQLSchemaIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreViewCQLSchema(t, target, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -1389,6 +1407,9 @@ func TestRestoreFullViewSSTableSchemaIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreViewSSTableSchema(t, schemaTarget, tablesTarget, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -1476,6 +1497,9 @@ func TestRestoreFullIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreAllTables(t, schemaTarget, tablesTarget, testKeyspace, testLoadCnt, testLoadSize, testUser) @@ -1582,6 +1606,9 @@ func TestRestoreFullAlternatorIntegration(t *testing.T) { BatchSize: testBatchSize, Parallel: testParallel, RestoreTables: true, + DCMappings: DCMappings{ + {Source: []string{"dc1"}, Target: []string{"dc1", "dc2"}}, + }, } restoreAlternator(t, schemaTarget, tablesTarget, testKeyspace, testTable, testUser, testAlternatorPort)