Skip to content

Commit

Permalink
Merge pull request #2225 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…2223-to-release-1.30

[release-1.30] fix: copy volume error in cross zone scenario
  • Loading branch information
andyzhangx authored Mar 15, 2024
2 parents b0ae896 + d0f2a1b commit b57e7a6
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 58 deletions.
84 changes: 48 additions & 36 deletions pkg/azuredisk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,41 +185,15 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

requirement := req.GetAccessibilityRequirements()
diskZone := azureutils.PickAvailabilityZone(requirement, diskParams.Location, topologyKey)
diskZone := azureutils.PickAvailabilityZone(req.GetAccessibilityRequirements(), diskParams.Location, topologyKey)
accessibleTopology := []*csi.Topology{}
if strings.HasSuffix(string(skuName), "ZRS") || strings.HasSuffix(string(skuName), "zrs") {
klog.V(2).Infof("diskZone(%s) is reset as empty since disk(%s) is ZRS(%s)", diskZone, diskParams.DiskName, skuName)
diskZone = ""
// make volume scheduled on all 3 availability zones
for i := 1; i <= 3; i++ {
topology := &csi.Topology{
Segments: map[string]string{topologyKey: fmt.Sprintf("%s-%d", diskParams.Location, i)},
}
accessibleTopology = append(accessibleTopology, topology)
}
// make volume scheduled on all non-zone nodes
topology := &csi.Topology{
Segments: map[string]string{topologyKey: ""},
}
accessibleTopology = append(accessibleTopology, topology)
} else {
accessibleTopology = []*csi.Topology{
{
Segments: map[string]string{topologyKey: diskZone},
},
}
}

if d.enableDiskCapacityCheck {
if ok, err := d.checkDiskCapacity(ctx, diskParams.SubscriptionID, diskParams.ResourceGroup, diskParams.DiskName, requestGiB); !ok {
return nil, err
}
}

klog.V(2).Infof("begin to create azure disk(%s) account type(%s) rg(%s) location(%s) size(%d) diskZone(%v) maxShares(%d)",
diskParams.DiskName, skuName, diskParams.ResourceGroup, diskParams.Location, requestGiB, diskZone, diskParams.MaxShares)

contentSource := &csi.VolumeContentSource{}

if strings.EqualFold(diskParams.WriteAcceleratorEnabled, consts.TrueValue) {
Expand Down Expand Up @@ -251,13 +225,51 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
},
}
subsID := azureutils.GetSubscriptionIDFromURI(sourceID)
if sourceGiB, _ := d.GetSourceDiskSize(ctx, subsID, diskParams.ResourceGroup, path.Base(sourceID), 0, consts.SourceDiskSearchMaxDepth); sourceGiB != nil && *sourceGiB < int32(requestGiB) {
diskParams.VolumeContext[consts.ResizeRequired] = strconv.FormatBool(true)
sourceGiB, disk, err := d.GetSourceDiskSize(ctx, subsID, diskParams.ResourceGroup, path.Base(sourceID), 0, consts.SourceDiskSearchMaxDepth)
if err == nil {
if sourceGiB != nil && *sourceGiB < int32(requestGiB) {
diskParams.VolumeContext[consts.ResizeRequired] = strconv.FormatBool(true)
klog.V(2).Infof("source disk(%s) size(%d) is less than requested size(%d), set resizeRequired as true", sourceID, *sourceGiB, requestGiB)
}
if disk != nil && len(disk.Zones) == 1 {
if disk.Zones[0] != nil {
diskZone = fmt.Sprintf("%s-%s", diskParams.Location, *disk.Zones[0])
klog.V(2).Infof("source disk(%s) is in zone(%s), set diskZone as %s", sourceID, *disk.Zones[0], diskZone)
}
}
} else {
klog.Warningf("failed to get source disk(%s) size, err: %v", sourceID, err)
}
metricsRequest = "controller_create_volume_from_volume"
}
}

if strings.HasSuffix(strings.ToLower(string(skuName)), "zrs") {
klog.V(2).Infof("diskZone(%s) is reset as empty since disk(%s) is ZRS(%s)", diskZone, diskParams.DiskName, skuName)
diskZone = ""
// make volume scheduled on all 3 availability zones
for i := 1; i <= 3; i++ {
topology := &csi.Topology{
Segments: map[string]string{topologyKey: fmt.Sprintf("%s-%d", diskParams.Location, i)},
}
accessibleTopology = append(accessibleTopology, topology)
}
// make volume scheduled on all non-zone nodes
topology := &csi.Topology{
Segments: map[string]string{topologyKey: ""},
}
accessibleTopology = append(accessibleTopology, topology)
} else {
accessibleTopology = []*csi.Topology{
{
Segments: map[string]string{topologyKey: diskZone},
},
}
}

klog.V(2).Infof("begin to create azure disk(%s) account type(%s) rg(%s) location(%s) size(%d) diskZone(%v) maxShares(%d)",
diskParams.DiskName, skuName, diskParams.ResourceGroup, diskParams.Location, requestGiB, diskZone, diskParams.MaxShares)

if skuName == armcompute.DiskStorageAccountTypesUltraSSDLRS {
if diskParams.DiskIOPSReadWrite == "" && diskParams.DiskMBPSReadWrite == "" {
// set default DiskIOPSReadWrite, DiskMBPSReadWrite per request size
Expand Down Expand Up @@ -1167,20 +1179,20 @@ func (d *Driver) getSnapshotByID(ctx context.Context, subsID, resourceGroup, sna
}

// GetSourceDiskSize recursively searches for the sourceDisk and returns: sourceDisk disk size, error
func (d *Driver) GetSourceDiskSize(ctx context.Context, subsID, resourceGroup, diskName string, curDepth, maxDepth int) (*int32, error) {
func (d *Driver) GetSourceDiskSize(ctx context.Context, subsID, resourceGroup, diskName string, curDepth, maxDepth int) (*int32, *armcompute.Disk, error) {
if curDepth > maxDepth {
return nil, status.Error(codes.Internal, fmt.Sprintf("current depth (%d) surpassed the max depth (%d) while searching for the source disk size", curDepth, maxDepth))
return nil, nil, status.Error(codes.Internal, fmt.Sprintf("current depth (%d) surpassed the max depth (%d) while searching for the source disk size", curDepth, maxDepth))
}
diskClient, err := d.clientFactory.GetDiskClientForSub(subsID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, nil, status.Error(codes.Internal, err.Error())
}
result, err := diskClient.Get(ctx, resourceGroup, diskName)
if err != nil {
return nil, err
return nil, result, err
}
if result.Properties == nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("DiskProperty not found for disk (%s) in resource group (%s)", diskName, resourceGroup))
return nil, result, status.Error(codes.Internal, fmt.Sprintf("DiskProperty not found for disk (%s) in resource group (%s)", diskName, resourceGroup))
}

if result.Properties.CreationData != nil && result.Properties.CreationData.CreateOption != nil && *result.Properties.CreationData.CreateOption == armcompute.DiskCreateOptionCopy {
Expand All @@ -1192,9 +1204,9 @@ func (d *Driver) GetSourceDiskSize(ctx context.Context, subsID, resourceGroup, d
}

if (*result.Properties).DiskSizeGB == nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("DiskSizeGB for disk (%s) in resourcegroup (%s) is nil", diskName, resourceGroup))
return nil, result, status.Error(codes.Internal, fmt.Sprintf("DiskSizeGB for disk (%s) in resourcegroup (%s) is nil", diskName, resourceGroup))
}
return (*result.Properties).DiskSizeGB, nil
return (*result.Properties).DiskSizeGB, result, nil
}

// The format of snapshot id is /subscriptions/xxx/resourceGroups/xxx/providers/Microsoft.Compute/snapshots/snapshot-xxx-xxx.
Expand Down
10 changes: 5 additions & 5 deletions pkg/azuredisk/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2130,7 +2130,7 @@ func TestGetSourceDiskSize(t *testing.T) {
cntl := gomock.NewController(t)
defer cntl.Finish()
d, _ := NewFakeDriver(cntl)
_, err := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk", 2, 1)
_, _, err := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk", 2, 1)
expectedErr := status.Errorf(codes.Internal, "current depth (2) surpassed the max depth (1) while searching for the source disk size")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
Expand All @@ -2148,7 +2148,7 @@ func TestGetSourceDiskSize(t *testing.T) {
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().GetDiskClientForSub(gomock.Any()).Return(diskClient, nil).AnyTimes()
diskClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(disk, nil).AnyTimes()

_, err := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk", 0, 1)
_, _, err := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk", 0, 1)
expectedErr := status.Error(codes.Internal, "DiskProperty not found for disk (test-disk) in resource group (test-rg)")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
Expand All @@ -2168,7 +2168,7 @@ func TestGetSourceDiskSize(t *testing.T) {
diskClient := mock_diskclient.NewMockInterface(cntl)
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().GetDiskClientForSub(gomock.Any()).Return(diskClient, nil).AnyTimes()
diskClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(disk, nil).AnyTimes()
_, err := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk", 0, 1)
_, _, err := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk", 0, 1)
expectedErr := status.Error(codes.Internal, "DiskSizeGB for disk (test-disk) in resourcegroup (test-rg) is nil")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
Expand All @@ -2191,7 +2191,7 @@ func TestGetSourceDiskSize(t *testing.T) {
diskClient := mock_diskclient.NewMockInterface(cntl)
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().GetDiskClientForSub(gomock.Any()).Return(diskClient, nil).AnyTimes()
diskClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(disk, nil).AnyTimes()
size, _ := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk", 0, 1)
size, _, _ := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk", 0, 1)
expectedOutput := diskSizeGB
if *size != expectedOutput {
t.Errorf("actualOutput: (%v), expectedOutput: (%v)", *size, expectedOutput)
Expand Down Expand Up @@ -2227,7 +2227,7 @@ func TestGetSourceDiskSize(t *testing.T) {
diskClient := mock_diskclient.NewMockInterface(cntl)
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().GetDiskClientForSub(gomock.Any()).Return(diskClient, nil).AnyTimes()
diskClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(disk1, nil).Return(disk2, nil).AnyTimes()
size, _ := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk-1", 0, 2)
size, _, _ := d.GetSourceDiskSize(context.Background(), "", "test-rg", "test-disk-1", 0, 2)
expectedOutput := diskSizeGB2
if *size != expectedOutput {
t.Errorf("actualOutput: (%v), expectedOutput: (%v)", *size, expectedOutput)
Expand Down
16 changes: 8 additions & 8 deletions pkg/azuredisk/controllerserver_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (d *DriverV2) CreateVolume(ctx context.Context, req *csi.CreateVolumeReques
}

subsID := azureutils.GetSubscriptionIDFromURI(sourceID)
if sourceGiB, _ := d.GetSourceDiskSize(ctx, subsID, diskParams.ResourceGroup, path.Base(sourceID), 0, consts.SourceDiskSearchMaxDepth); sourceGiB != nil && *sourceGiB < int32(requestGiB) {
if sourceGiB, _, _ := d.GetSourceDiskSize(ctx, subsID, diskParams.ResourceGroup, path.Base(sourceID), 0, consts.SourceDiskSearchMaxDepth); sourceGiB != nil && *sourceGiB < int32(requestGiB) {
diskParams.VolumeContext[consts.ResizeRequired] = strconv.FormatBool(true)
}
}
Expand Down Expand Up @@ -963,20 +963,20 @@ func (d *DriverV2) getSnapshotByID(ctx context.Context, subsID, resourceGroup, s
}

// GetSourceDiskSize recursively searches for the sourceDisk and returns: sourceDisk disk size, error
func (d *DriverV2) GetSourceDiskSize(ctx context.Context, subsID, resourceGroup, diskName string, curDepth, maxDepth int) (*int32, error) {
func (d *DriverV2) GetSourceDiskSize(ctx context.Context, subsID, resourceGroup, diskName string, curDepth, maxDepth int) (*int32, *armcompute.Disk, error) {
if curDepth > maxDepth {
return nil, status.Error(codes.Internal, fmt.Sprintf("current depth (%d) surpassed the max depth (%d) while searching for the source disk size", curDepth, maxDepth))
return nil, nil, status.Error(codes.Internal, fmt.Sprintf("current depth (%d) surpassed the max depth (%d) while searching for the source disk size", curDepth, maxDepth))
}
diskClient, err := d.clientFactory.GetDiskClientForSub(subsID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, nil, status.Error(codes.Internal, err.Error())
}
result, err := diskClient.Get(ctx, resourceGroup, diskName)
if err != nil {
return nil, err
return nil, result, err
}
if result.Properties == nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("DiskProperty not found for disk (%s) in resource group (%s)", diskName, resourceGroup))
return nil, result, status.Error(codes.Internal, fmt.Sprintf("DiskProperty not found for disk (%s) in resource group (%s)", diskName, resourceGroup))
}

if result.Properties.CreationData != nil && result.Properties.CreationData.CreateOption != nil && *result.Properties.CreationData.CreateOption == armcompute.DiskCreateOptionCopy {
Expand All @@ -988,9 +988,9 @@ func (d *DriverV2) GetSourceDiskSize(ctx context.Context, subsID, resourceGroup,
}

if (*result.Properties).DiskSizeGB == nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("DiskSizeGB for disk (%s) in resourcegroup (%s) is nil", diskName, resourceGroup))
return nil, result, status.Error(codes.Internal, fmt.Sprintf("DiskSizeGB for disk (%s) in resourcegroup (%s) is nil", diskName, resourceGroup))
}
return (*result.Properties).DiskSizeGB, nil
return (*result.Properties).DiskSizeGB, result, nil
}

// The format of snapshot id is /subscriptions/xxx/resourceGroups/xxx/providers/Microsoft.Compute/snapshots/snapshot-xxx-xxx.
Expand Down
2 changes: 1 addition & 1 deletion pkg/azuredisk/fake_azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var (
type FakeDriver interface {
CSIDriver

GetSourceDiskSize(ctx context.Context, subsID, resourceGroup, diskName string, curDepth, maxDepth int) (*int32, error)
GetSourceDiskSize(ctx context.Context, subsID, resourceGroup, diskName string, curDepth, maxDepth int) (*int32, *armcompute.Disk, error)

setNextCommandOutputScripts(scripts ...testingexec.FakeAction)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,12 @@ func (t *DynamicallyProvisionedVolumeCloningTest) Run(ctx context.Context, clien
clonedVolume.ClaimSize = t.ClonedVolumeSize
}

zone := tpod.GetZoneForVolume(ctx, 0)

t.PodWithClonedVolume.Volumes = []VolumeDetails{clonedVolume}
tpod, cleanups = t.PodWithClonedVolume.SetupWithDynamicVolumes(ctx, client, namespace, t.CSIDriver, t.StorageClassParameters)
for i := range cleanups {
defer cleanups[i](ctx)
}

// Since an LRS disk cannot be cloned to a different zone, add a selector to the pod so
// that it is created in the same zone as the source disk.
if len(zone) != 0 {
tpod.SetNodeSelector(map[string]string{"topology.disk.csi.azure.com/zone": zone})
}

ginkgo.By("deploying a second pod with cloned volume")
tpod.Create(ctx)
defer tpod.Cleanup(ctx)
Expand Down

0 comments on commit b57e7a6

Please sign in to comment.