Skip to content

Commit

Permalink
csiaddons: initial implementation of CSI-Addons VolumeGroup
Browse files Browse the repository at this point in the history
Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Jul 10, 2024
1 parent ea274ed commit 8cc0d5d
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 15 deletions.
2 changes: 1 addition & 1 deletion internal/csi-addons/rbd/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (is *IdentityServer) GetCapabilities(
}, &identity.Capability{
Type: &identity.Capability_VolumeGroup_{
VolumeGroup: &identity.Capability_VolumeGroup{
Type: identity.Capability_VolumeGroup_MODIFY_VOLUME_GROUP,
Type: identity.Capability_VolumeGroup_DO_NOT_ALLOW_VG_TO_DELETE_VOLUMES,
},
},
}, &identity.Capability{
Expand Down
175 changes: 162 additions & 13 deletions internal/csi-addons/rbd/volumegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,179 @@ limitations under the License.
package rbd

import (
corerbd "github.com/ceph/ceph-csi/internal/rbd"
"context"
"fmt"

"github.com/ceph/ceph-csi/internal/rbd"
types "github.com/ceph/ceph-csi/internal/rbd_types"
"github.com/ceph/ceph-csi/internal/util/log"

"github.com/csi-addons/spec/lib/go/volumegroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// VolumeGroupServer struct of rbd CSI driver with supported methods of VolumeGroup
// controller server spec.
// VolumeGroupServer struct of rbd CSI driver with supported methods of
// VolumeGroup controller server spec.
type VolumeGroupServer struct {
// added UnimplementedControllerServer as a member of
// ControllerServer. if volumegroup spec add more RPC services in the proto
// file, then we don't need to add all RPC methods leading to forward
// compatibility.
// added UnimplementedControllerServer as a member of ControllerServer.
// if volumegroup spec add more RPC services in the proto file, then we
// don't need to add all RPC methods leading to forward compatibility.
*volumegroup.UnimplementedControllerServer
// Embed ControllerServer as it implements helper functions
*corerbd.ControllerServer
}

// NewVolumeGroupServer creates a new VolumeGroupServer which handles
// the VolumeGroup Service requests from the CSI-Addons specification.
func NewVolumeGroupServer(c *corerbd.ControllerServer) *VolumeGroupServer {
return &VolumeGroupServer{ControllerServer: c}
// NewVolumeGroupServer creates a new VolumeGroupServer which handles the
// VolumeGroup Service requests from the CSI-Addons specification.
func NewVolumeGroupServer() *VolumeGroupServer {
return &VolumeGroupServer{}
}

func (vs *VolumeGroupServer) RegisterService(server grpc.ServiceRegistrar) {
volumegroup.RegisterControllerServer(server, vs)
}

// CreateVolumeGroup RPC call to create a volume group.
//
// This RPC will be called by the CO to create a new volume group on behalf of
// a user. This operation MUST be idempotent. If a volume group corresponding
// to the specified volume group name already exists, is compatible with the
// specified parameters in the CreateVolumeGroupRequest, the Plugin MUST reply
// 0 OK with the corresponding CreateVolumeGroupResponse. CSI Plugins MAY
// create the following types of volume groups:
//
// Create a new empty volume group or a group with specific volumes. Note that
// N volumes with some backend label Y could be considered to be in "group Y"
// which might not be a physical group on the storage backend. In this case, an
// empty group can still be created by the CO to hold volumes. After the empty
// group is created, create a new volume. CO may call
// ModifyVolumeGroupMembership to add new volumes to the group.
func (vs *VolumeGroupServer) CreateVolumeGroup(
ctx context.Context,
req *volumegroup.CreateVolumeGroupRequest,
) (*volumegroup.CreateVolumeGroupResponse, error) {
mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve all volumes
volumes := make([]types.Volume, len(req.GetVolumeIds()))
for i, id := range req.GetVolumeIds() {
vol, err := mgr.GetVolumeByID(ctx, id)
if err != nil {
return nil, status.Errorf(
codes.InvalidArgument,
"failed to find required volume %q for volume group %q: %s",
id,
req.GetName(),
err.Error())
}

//nolint:gocritic // need to call .Destroy() for all volumes
defer vol.Destroy(ctx)
volumes[i] = vol
}

log.DebugLog(ctx, fmt.Sprintf("all %d Volumes for VolumeGroup %q have been found", len(volumes), req.GetName()))

// create a RBDVolumeGroup
vg, err := mgr.CreateVolumeGroup(ctx, req.GetName())
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to create volume group %q: %s",
req.GetName(),
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q had been created", req.GetName()))

// add each rbd-image to the RBDVolumeGroup
for _, vol := range volumes {
err = vg.AddVolume(ctx, vol)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to add volume %q to volume group %q: %s",
vol,
req.GetName(),
err.Error())
}
}

log.DebugLog(ctx, fmt.Sprintf("all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName()))

return &volumegroup.CreateVolumeGroupResponse{
VolumeGroup: vg.ToCSI(ctx),
}, nil
}

// DeleteVolumeGroup RPC call to delete a volume group.
//
// From the spec:
// This RPC will be called by the CO to delete a volume group on behalf of a
// user. This operation MUST be idempotent.
//
// If a volume group corresponding to the specified volume_group_id does not
// exist or the artifacts associated with the volume group do not exist
// anymore, the Plugin MUST reply 0 OK.
//
// A volume cannot be deleted individually when it is part of the group. It has
// to be removed from the group first. Delete a volume group will delete all
// volumes in the group.
//
// Note:
// The undocumented DO_NOT_ALLOW_VG_TO_DELETE_VOLUMES capability is set. There
// is no need to delete eack volume that may be part of the volume group. If
// the volume group is not empty, a FAILED_PRECONDITION error will be returned.
func (vs *VolumeGroupServer) DeleteVolumeGroup(
ctx context.Context,
req *volumegroup.DeleteVolumeGroupRequest,
) (*volumegroup.DeleteVolumeGroupResponse, error) {
mgr := rbd.NewManager(nil, req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"could not find volume group %q: %s",
req.GetVolumeGroupId(),
err.Error())
}
defer vg.Destroy(ctx)

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q has been found", req.GetVolumeGroupId()))

// verify that the volume group is empty
volumes, err := vg.ListVolumes(ctx)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"could not list volumes for voluem group %q: %s",
req.GetVolumeGroupId(),
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes)))

if len(volumes) != 0 {
return nil, status.Errorf(
codes.FailedPrecondition,
"rejecting to delete non-empty volume group %q",
req.GetVolumeGroupId())
}

// delete the volume group
err = mgr.DeleteVolumeGroup(ctx, vg)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to delete volume group %q: %s",
req.GetVolumeGroupId(),
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q has been deleted", req.GetVolumeGroupId()))

return &volumegroup.DeleteVolumeGroupResponse{}, nil
}
2 changes: 1 addition & 1 deletion internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
rcs := casrbd.NewReplicationServer(NewControllerServer(r.cd))
r.cas.RegisterService(rcs)

vgcs := casrbd.NewVolumeGroupServer(NewControllerServer(r.cd))
vgcs := casrbd.NewVolumeGroupServer()
r.cas.RegisterService(vgcs)
}

Expand Down

0 comments on commit 8cc0d5d

Please sign in to comment.