Skip to content

Commit

Permalink
[WIP] cleanups and more function implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
nixpanic committed Mar 18, 2024
1 parent 2cd37d9 commit e9cb542
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 76 deletions.
134 changes: 82 additions & 52 deletions internal/rbd/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (

// cephConfig contains the configuration parameters for the Ceph cluster.
type cephConfig struct {
clusterID string
mons string
pool string
journalPool string
namespace string
clusterID string
mons string
pool string
journalPool string
namespace string
groupNamePrefix string
}

func getCephConfig(ctx context.Context, params, secrets map[string]string) (*cephConfig, error) {
Expand Down Expand Up @@ -64,12 +65,18 @@ func getCephConfig(ctx context.Context, params, secrets map[string]string) (*cep
return nil, errors.New("missing required parameter: radosNamespace")
}

namePrefix := params["groupNamePrefix"]
if namePrefix == "" {
return nil, errors.New("missing required parameter: groupNamePrefix")
}

return &cephConfig{
clusterID: clusterID,
mons: mons,
pool: pool,
journalPool: journalPool,
namespace: namespace,
clusterID: clusterID,
mons: mons,
pool: pool,
journalPool: journalPool,
namespace: namespace,
groupNamePrefix: namePrefix,
}, nil
}

Expand Down Expand Up @@ -105,6 +112,27 @@ func getVolumesForGroup(ctx context.Context, volumeIDs []string, secrets map[str
return volumes, nil
}

func initVolumeGroup(ctx context.Context, config *cephConfig, name string, secrets map[string]string) (types.VolumeGroup, error) {
group := rbd_group.NewVolumeGroup(ctx, name, config.clusterID, secrets)

err := group.SetMonitors(ctx, config.mons)
if err != nil {
return nil, err
}

err = group.SetPool(ctx, config.pool)
if err != nil {
return nil, err
}

err = group.SetJournalNamespace(ctx, config.journalPool, config.namespace)
if err != nil {
return nil, err
}

return group, nil
}

func (cs *ControllerServer) CreateVolumeGroupSnapshot(ctx context.Context, req *csi.CreateVolumeGroupSnapshotRequest) (*csi.CreateVolumeGroupSnapshotResponse, error) {

// 1. resolve each rbd-image from the volume-id
Expand All @@ -123,28 +151,26 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(ctx context.Context, req *
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

group := rbd_group.NewVolumeGroup(ctx, req.GetName(), config.clusterID, req.GetSecrets())
defer group.Destroy(ctx)

err = group.SetMonitors(ctx, config.mons)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
for _, v := range volumes {
defer v.Destroy()
}

err = group.SetPool(ctx, config.pool)
group, err := initVolumeGroup(ctx, config, req.GetName(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer group.Destroy(ctx)

err = group.SetJournalNamespace(ctx, config.journalPool, config.namespace)
// TODO: take a lock on the request

err = group.Create(ctx, config.groupNamePrefix)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}

// TODO: add images to the group
for _, volume := range volumes {
err = group.AddVolume(ctx, volume)
// add images to the group
for _, v := range volumes {
err = group.AddVolume(ctx, v)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}
Expand All @@ -156,41 +182,21 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(ctx context.Context, req *
}
defer groupSnapshot.Destroy(ctx)

groupSnapshotID, err := groupSnapshot.GetID(ctx)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}

snapshots, err := groupSnapshot.ListSnapshots(ctx)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}

csiSnapshots := make([]*csi.Snapshot, len(snapshots))
for i, snapshot := range snapshots {
csiSnapshot, err := snapshot.ToCSISnapshot(ctx)
// remove images from the group
for _, v := range volumes {
err = group.RemoveVolume(ctx, v)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}

csiSnapshots[i] = csiSnapshot
}

// TODO: remove images from the group
for _, volume := range volumes {
err = group.RemoveVolume(ctx, volume)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}
csiGroupSnapshot, err := groupSnapshot.ToCSIVolumeGroupSnapshot(ctx)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}

return &csi.CreateVolumeGroupSnapshotResponse{
GroupSnapshot: &csi.VolumeGroupSnapshot{
GroupSnapshotId: groupSnapshotID,
Snapshots: csiSnapshots,
CreationTime: nil,
ReadyToUse: groupSnapshot.GetReadyToUse(ctx),
},
GroupSnapshot: csiGroupSnapshot,
}, nil
}

Expand All @@ -199,11 +205,35 @@ func (cs *ControllerServer) DeleteVolumeGroupSnapshot(ctx context.Context, req *
// 1. verify that all snapshots in the request are all snapshots in the group
// 2. delete the group

return nil, nil
snapshot, err := rbd_group.GetVolumeGroupSnapshot(ctx, req.GetGroupSnapshotId(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer snapshot.Destroy(ctx)

err = snapshot.Delete(ctx)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}

return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}

// TODO
// sortof optional, only used for static/pre-provisioned VolumeGroupSnapshots
func (cs *ControllerServer) GetVolumeGroupSnapshot(ctx context.Context, req *csi.GetVolumeGroupSnapshotRequest) (*csi.GetVolumeGroupSnapshotResponse, error) {
return nil, nil
snapshot, err := rbd_group.GetVolumeGroupSnapshot(ctx, req.GetGroupSnapshotId(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer snapshot.Destroy(ctx)

csiGroupSnapshot, err := snapshot.ToCSIVolumeGroupSnapshot(ctx)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

return &csi.GetVolumeGroupSnapshotResponse{
GroupSnapshot: csiGroupSnapshot,
}, nil
}
19 changes: 14 additions & 5 deletions internal/rbd_group/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package rbd_group
import (
"context"
"fmt"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"

Expand Down Expand Up @@ -55,18 +56,26 @@ func (rgs *rbdGroupSnapshot) String() string {
return fmt.Sprintf("%s@%s", rgs.parent, rgs.snapName)
}

func (rgs *rbdGroupSnapshot) GetCreationTime(ctx context.Context) (*time.Time, error) {
return nil, nil
}

func (rgs *rbdGroupSnapshot) GetReadyToUse(ctx context.Context) (bool, error) {
return false, nil
}

func (rgs *rbdGroupSnapshot) ToCSISnapshot(ctx context.Context) (*csi.Snapshot, error) {
parentID, err := rgs.parent.GetID(ctx)
if err != nil {
return nil, err
}

return &csi.Snapshot{
SizeBytes: 0,
SnapshotId: "",
SourceVolumeId: "",
CreationTime: nil,
ReadyToUse: false,
SizeBytes: 0,
SnapshotId: "",
SourceVolumeId: "",
CreationTime: nil,
ReadyToUse: false,
GroupSnapshotId: parentID,
}, nil
}
Expand Down
105 changes: 97 additions & 8 deletions internal/rbd_group/volume_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

const (
groupSuffix = "rbd-group"
groupPrefix = "rbd-group"
)

var (
Expand Down Expand Up @@ -71,6 +70,54 @@ func NewVolumeGroup(ctx context.Context, name, clusterID string, secrets map[str
}
}

func GetVolumeGroup(ctx context.Context, id string, secrets map[string]string) (types.VolumeGroup, error) {
csiID := util.CSIIdentifier{}

err := csiID.DecomposeCSIID(id)
if err != nil {
return nil, err
}

mons, _, err := util.GetMonsAndClusterID(ctx, csiID.ClusterID, false)
if err != nil {
return nil, err
}

namespace, err := util.GetRadosNamespace(util.CsiConfigFile, csiID.ClusterID)
if err != nil {
return nil, err
}

vg := &rbdVolumeGroup{
clusterID: csiID.ClusterID,
monitors: mons,
secrets: secrets,
id: id,
}
defer func() {
if err != nil {
vg.Destroy(ctx)
}
}()

vg.credentials, err = util.NewUserCredentials(secrets)
if err != nil {
return nil, err
}

pool, err := util.GetPoolName(mons, vg.credentials, csiID.LocationID)
if err != nil {
return nil, err
}

err = vg.SetJournalNamespace(ctx, pool, namespace)
if err != nil {
return nil, err
}

return vg, nil
}

func (rvg *rbdVolumeGroup) validate() error {
if rvg.ioctx == nil {
return ErrRBDGroupNotConnected
Expand Down Expand Up @@ -107,9 +154,9 @@ func (rvg *rbdVolumeGroup) Destroy(ctx context.Context) {
}

func (rvg *rbdVolumeGroup) GetID(ctx context.Context) (string, error) {
// FIXME: this should be the group-snapshot-handle
if rvg.id != "" {
return rvg.id, nil
if rvg.id == "" {
// FIXME: rbdVolumeGroup need have called .Create() or GetVolumeGroup()
return "", errors.New("BUG: rbdVolumeGroup does not have ID set")
}

return rvg.id, nil
Expand Down Expand Up @@ -165,7 +212,7 @@ func (rvg *rbdVolumeGroup) SetJournalNamespace(ctx context.Context, pool, namesp
return nil
}

func (rvg *rbdVolumeGroup) Create(ctx context.Context) error {
func (rvg *rbdVolumeGroup) Create(ctx context.Context, prefix string) error {
if err := rvg.validate(); err != nil {
return err
}
Expand All @@ -185,7 +232,7 @@ func (rvg *rbdVolumeGroup) Create(ctx context.Context) error {
rvg.journalPool,
journalPoolID,
rvg.name,
groupPrefix)
prefix)
if err != nil {
return err
}
Expand All @@ -210,15 +257,57 @@ func (rvg *rbdVolumeGroup) AddVolume(ctx context.Context, image types.Volume) er
return err
}

return image.AddToGroup(ctx, rvg.ioctx, rvg.name)
err := image.AddToGroup(ctx, rvg.ioctx, rvg.name)
if err != nil {
return err
}

rvg.volumes = append(rvg.volumes, image)

return nil
}

func (rvg *rbdVolumeGroup) RemoveVolume(ctx context.Context, image types.Volume) error {
if err := rvg.validate(); err != nil {
return err
}

return image.RemoveFromGroup(ctx, rvg.ioctx, rvg.name)
// volume was already removed from the group
if len(rvg.volumes) == 0 {
return nil
}

err := image.RemoveFromGroup(ctx, rvg.ioctx, rvg.name)
if err != nil {
return err
}

// toRemove contain the ID of the volume that is removed from the group
toRemove, err := image.GetID(ctx)
if err != nil {
return err
}

// volumes is the updated list, without the volume that was removed
volumes := make([]types.Volume, 0)
for _, v := range rvg.volumes {
id, err := v.GetID(ctx)
if err != nil {
return err
}

if id == toRemove {
// do not add the volume to the list
continue
}

volumes = append(volumes, v)
}

// update the list of volumes
rvg.volumes = volumes

return nil
}

func (rvg *rbdVolumeGroup) CreateSnapshot(ctx context.Context, snapName string) (types.VolumeGroupSnapshot, error) {
Expand Down
Loading

0 comments on commit e9cb542

Please sign in to comment.