Skip to content

Commit

Permalink
rbd: add locking for VolumeGroupSnapshot operations
Browse files Browse the repository at this point in the history
Add VolumeGroupLocks in the CSI Controller Server so that operations are
protected against concurrent requests for the same VolumeGroupSnapshot.

Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Nov 11, 2024
1 parent f3d40f9 commit af73fca
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
4 changes: 4 additions & 0 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type ControllerServer struct {
// A map storing all volumes/snapshots with ongoing operations.
OperationLocks *util.OperationLock

// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by volumegroup ID/volumegroup name) return an Aborted error
VolumeGroupLocks *util.VolumeLocks

// Cluster name
ClusterName string

Expand Down
1 change: 1 addition & 0 deletions internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer {
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
VolumeLocks: util.NewVolumeLocks(),
SnapshotLocks: util.NewVolumeLocks(),
VolumeGroupLocks: util.NewVolumeLocks(),
OperationLocks: util.NewOperationLock(),
}
}
Expand Down
37 changes: 33 additions & 4 deletions internal/rbd/group_controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/status"

"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)

Expand All @@ -50,6 +51,14 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(
vgsName = req.GetName()
)

// Existence and conflict checks
if acquired := cs.VolumeGroupLocks.TryAcquire(vgsName); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, vgsName)

return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, vgsName)
}
defer cs.VolumeGroupLocks.Release(vgsName)

mgr := NewManager(cs.Driver.GetInstanceID(), req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

Expand Down Expand Up @@ -166,15 +175,25 @@ func (cs *ControllerServer) DeleteVolumeGroupSnapshot(
// 1. verify that all snapshots in the request are all snapshots in the group
// 2. delete the group

groupSnapshotID := req.GetGroupSnapshotId()

// Existence and conflict checks
if acquired := cs.VolumeGroupLocks.TryAcquire(groupSnapshotID); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, groupSnapshotID)

return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, groupSnapshotID)
}
defer cs.VolumeGroupLocks.Release(groupSnapshotID)

mgr := NewManager(cs.Driver.GetInstanceID(), nil, req.GetSecrets())
defer mgr.Destroy(ctx)

groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, req.GetGroupSnapshotId())
groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to get volume group snapshot with id %q: %v",
req.GetGroupSnapshotId(), err)
groupSnapshotID, err)
}
defer groupSnapshot.Destroy(ctx)

Expand All @@ -195,15 +214,25 @@ func (cs *ControllerServer) GetVolumeGroupSnapshot(
ctx context.Context,
req *csi.GetVolumeGroupSnapshotRequest,
) (*csi.GetVolumeGroupSnapshotResponse, error) {
groupSnapshotID := req.GetGroupSnapshotId()

// Existence and conflict checks
if acquired := cs.VolumeGroupLocks.TryAcquire(groupSnapshotID); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, groupSnapshotID)

return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, groupSnapshotID)
}
defer cs.VolumeGroupLocks.Release(groupSnapshotID)

mgr := NewManager(cs.Driver.GetInstanceID(), nil, req.GetSecrets())
defer mgr.Destroy(ctx)

groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, req.GetGroupSnapshotId())
groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to get volume group snapshot with id %q: %v",
req.GetGroupSnapshotId(), err)
groupSnapshotID, err)
}
defer groupSnapshot.Destroy(ctx)

Expand Down

0 comments on commit af73fca

Please sign in to comment.