diff --git a/internal/csi-addons/rbd/reclaimspace.go b/internal/csi-addons/rbd/reclaimspace.go index 9fb124a21ac5..1d3f4bbfa1e1 100644 --- a/internal/csi-addons/rbd/reclaimspace.go +++ b/internal/csi-addons/rbd/reclaimspace.go @@ -37,12 +37,13 @@ import ( // of CSI-addons reclaimspace controller service spec. type ReclaimSpaceControllerServer struct { *rs.UnimplementedReclaimSpaceControllerServer + volumeLocks *util.VolumeLocks } // NewReclaimSpaceControllerServer creates a new ReclaimSpaceControllerServer which handles // the ReclaimSpace Service requests from the CSI-Addons specification. -func NewReclaimSpaceControllerServer() *ReclaimSpaceControllerServer { - return &ReclaimSpaceControllerServer{} +func NewReclaimSpaceControllerServer(volumeLocks *util.VolumeLocks) *ReclaimSpaceControllerServer { + return &ReclaimSpaceControllerServer{volumeLocks: volumeLocks} } func (rscs *ReclaimSpaceControllerServer) RegisterService(server grpc.ServiceRegistrar) { @@ -64,6 +65,13 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace( } defer cr.DeleteCredentials() + if acquired := rscs.volumeLocks.TryAcquire(volumeID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rscs.volumeLocks.Release(volumeID) + rbdVol, err := rbdutil.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) if err != nil { return nil, status.Errorf(codes.Aborted, "failed to find volume with ID %q: %s", volumeID, err.Error()) @@ -90,12 +98,13 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace( // of CSI-addons reclaimspace controller service spec. type ReclaimSpaceNodeServer struct { *rs.UnimplementedReclaimSpaceNodeServer + volumeLocks *util.VolumeLocks } // NewReclaimSpaceNodeServer creates a new IdentityServer which handles the // Identity Service requests from the CSI-Addons specification. -func NewReclaimSpaceNodeServer() *ReclaimSpaceNodeServer { - return &ReclaimSpaceNodeServer{} +func NewReclaimSpaceNodeServer(volumeLocks *util.VolumeLocks) *ReclaimSpaceNodeServer { + return &ReclaimSpaceNodeServer{volumeLocks: volumeLocks} } func (rsns *ReclaimSpaceNodeServer) RegisterService(server grpc.ServiceRegistrar) { @@ -116,6 +125,13 @@ func (rsns *ReclaimSpaceNodeServer) NodeReclaimSpace( return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } + if acquired := rsns.volumeLocks.TryAcquire(volumeID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rsns.volumeLocks.Release(volumeID) + // path can either be the staging path on the node, or the volume path // inside an application container path := req.GetStagingTargetPath() diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 979da3b1c5a7..7d3fddb1367e 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -144,10 +144,8 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) - if err != nil { - log.FatalLogMsg("failed to start node server, err %v\n", err) - } + r.ns = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) + var attr string attr, err = rbd.GetKrbdSupportedFeatures() if err != nil && !errors.Is(err, os.ErrNotExist) { @@ -213,7 +211,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { r.cas.RegisterService(is) if conf.IsControllerServer { - rs := casrbd.NewReclaimSpaceControllerServer() + rs := casrbd.NewReclaimSpaceControllerServer(r.cs.VolumeLocks) r.cas.RegisterService(rs) fcs := casrbd.NewFenceControllerServer() @@ -227,7 +225,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { } if conf.IsNodeServer { - rs := casrbd.NewReclaimSpaceNodeServer() + rs := casrbd.NewReclaimSpaceNodeServer(r.ns.VolumeLocks) r.cas.RegisterService(rs) ekr := casrbd.NewEncryptionKeyRotationServer(r.ns.VolumeLocks)