diff --git a/cmd/integration/make_backup/main.go b/cmd/integration/make_backup/main.go index b9c9da37..7b561080 100644 --- a/cmd/integration/make_backup/main.go +++ b/cmd/integration/make_backup/main.go @@ -125,6 +125,44 @@ func main() { if !done { log.Panicln("failed to complete a restore in 30 seconds") } + deleteOperation, err := client.DeleteBackup( + context.Background(), &pb.DeleteBackupRequest{ + BackupId: backupOperation.BackupId, + }, + ) + if err != nil { + log.Panicf("failed to delete backup: %v", err) + } + done = false + for range 30 { + op, err := opClient.GetOperation( + context.Background(), &pb.GetOperationRequest{ + Id: deleteOperation.Id, + }, + ) + if err != nil { + log.Panicf("failed to get operation: %v", err) + } + if op.GetStatus().String() == types.OperationStateDone.String() { + done = true + break + } + time.Sleep(time.Second) + } + if !done { + log.Panicln("failed to complete a delete backup in 30 seconds") + } + backup, err := client.GetBackup( + context.Background(), + &pb.GetBackupRequest{Id: backupOperation.BackupId}, + ) + if err != nil { + log.Panicf("failed to get backup: %v", err) + } + if backup.GetStatus().String() != types.BackupStateDeleted { + log.Panicf("expected DELETED backup status, but received: %s", backup.GetStatus().String()) + } + scheduleClient := pb.NewBackupScheduleServiceClient(conn) schedules, err := scheduleClient.ListBackupSchedules( context.Background(), &pb.ListBackupSchedulesRequest{ diff --git a/internal/handlers/delete_backup.go b/internal/handlers/delete_backup.go index a4193cbb..c22bb82e 100644 --- a/internal/handlers/delete_backup.go +++ b/internal/handlers/delete_backup.go @@ -33,7 +33,7 @@ func DBOperationHandler( db db.DBConnector, s3 s3.S3Connector, config config.Config, - queryBulderFactory queries.WriteQueryBulderFactory, + queryBuilderFactory queries.WriteQueryBulderFactory, ) error { xlog.Info(ctx, "DBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) @@ -60,7 +60,7 @@ func DBOperationHandler( operation.SetMessage("Operation deadline exceeded") operation.GetAudit().CompletedAt = timestamppb.Now() return db.ExecuteUpsert( - ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } @@ -81,7 +81,17 @@ func DBOperationHandler( } if len(backups) == 0 { - return fmt.Errorf("backup not found") + operation.SetState(types.OperationStateError) + operation.SetMessage("Backup not found") + operation.GetAudit().CompletedAt = timestamppb.Now() + return db.UpdateOperation(ctx, operation) + } + + if backups[0].Status != types.BackupStateDeleting { + operation.SetState(types.OperationStateError) + operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backups[0].Status)) + operation.GetAudit().CompletedAt = timestamppb.Now() + return db.UpdateOperation(ctx, operation) } deleteBackup := func(pathPrefix string, bucket string) error { @@ -107,11 +117,8 @@ func DBOperationHandler( switch dbOp.State { case types.OperationStatePending: { - backupToWrite.Status = types.BackupStateDeleting operation.SetState(types.OperationStateRunning) - err := db.ExecuteUpsert( - ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), - ) + err := db.UpdateOperation(ctx, operation) if err != nil { return fmt.Errorf("can't update operation: %v", err) } @@ -133,6 +140,6 @@ func DBOperationHandler( } return db.ExecuteUpsert( - ctx, queryBulderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ctx, queryBuilderFactory().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) } diff --git a/internal/handlers/delete_backup_test.go b/internal/handlers/delete_backup_test.go index 418784ec..d9595e0f 100644 --- a/internal/handlers/delete_backup_test.go +++ b/internal/handlers/delete_backup_test.go @@ -33,7 +33,7 @@ func TestDBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) { } backup := types.Backup{ ID: backupID, - Status: types.BackupStateAvailable, + Status: types.BackupStateDeleting, } opMap := make(map[string]types.Operation) @@ -87,7 +87,7 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { } backup := types.Backup{ ID: backupID, - Status: types.BackupStateAvailable, + Status: types.BackupStateDeleting, S3PathPrefix: "pathPrefix", } @@ -162,7 +162,7 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { } backup := types.Backup{ ID: backupID, - Status: types.BackupStateAvailable, + Status: types.BackupStateDeleting, S3PathPrefix: "pathPrefix", } @@ -220,3 +220,73 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { assert.Empty(t, err) assert.Empty(t, objects) } + +func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + dbOp := types.DeleteBackupOperation{ + ID: opId, + BackupID: backupID, + State: types.OperationStateRunning, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + Audit: &pb.AuditInfo{ + CreatedAt: timestamppb.Now(), + }, + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStateAvailable, + S3PathPrefix: "pathPrefix", + } + + opMap := make(map[string]types.Operation) + backupMap := make(map[string]types.Backup) + s3ObjectsMap := make(map[string][]*s3.Object) + backupMap[backupID] = backup + opMap[opId] = &dbOp + s3ObjectsMap["pathPrefix"] = []*s3.Object{ + { + Key: aws.String("data_1.csv"), + Size: aws.Int64(100), + }, + { + Key: aws.String("data_2.csv"), + Size: aws.Int64(150), + }, + { + Key: aws.String("data_3.csv"), + Size: aws.Int64(200), + }, + } + + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + + s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap) + + handler := NewDBOperationHandler( + dbConnector, s3Connector, config.Config{ + OperationTtlSeconds: 1000, + }, queries.NewWriteTableQueryMock, + ) + + err := handler(ctx, &dbOp) + assert.Empty(t, err) + + // check operation status (should be failed) + op, err := dbConnector.GetOperation(ctx, dbOp.ID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Unexpected backup status: AVAILABLE", op.GetMessage()) + + // check backup status (should be the same as before) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateAvailable, b.Status) +} diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 05e3573e..e06c85cc 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -99,6 +99,8 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques if err != nil { return nil, err } + ctx = xlog.With(ctx, zap.String("OperationID", op.GetID())) + xlog.Debug(ctx, "MakeBackup was started successfully", zap.String("operation", types.OperationToString(op))) return op.Proto(), nil } @@ -151,6 +153,7 @@ func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRe now := timestamppb.Now() op := &types.DeleteBackupOperation{ + ID: types.GenerateObjectID(), ContainerID: backup.ContainerID, BackupID: req.GetBackupId(), State: types.OperationStatePending, @@ -166,14 +169,20 @@ func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRe UpdatedAt: now, } - operationID, err := s.driver.CreateOperation(ctx, op) + backupToWrite := types.Backup{ + ID: req.GetBackupId(), + Status: types.BackupStateDeleting, + } + + err = s.driver.ExecuteUpsert( + ctx, queries.NewWriteTableQuery().WithCreateOperation(op).WithUpdateBackup(backupToWrite), + ) if err != nil { xlog.Error(ctx, "can't create operation", zap.Error(err)) return nil, status.Error(codes.Internal, "can't create operation") } - ctx = xlog.With(ctx, zap.String("OperationID", operationID)) - op.ID = operationID + ctx = xlog.With(ctx, zap.String("OperationID", op.GetID())) xlog.Debug(ctx, "DeleteBackup was started successfully", zap.String("operation", types.OperationToString(op))) return op.Proto(), nil }