Skip to content

Commit

Permalink
rbd: refactor the replication code
Browse files Browse the repository at this point in the history
added interface to refactor mirroring
to work for both volume and group

Signed-off-by: Madhu Rajanna <[email protected]>
  • Loading branch information
Madhu-1 committed Jul 19, 2024
1 parent f11fa81 commit 1899b64
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 139 deletions.
192 changes: 114 additions & 78 deletions internal/csi-addons/rbd/replication.go

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

corerbd "github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/types"

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
Expand Down Expand Up @@ -651,7 +652,7 @@ func Test_getFlattenMode(t *testing.T) {
tests := []struct {
name string
args args
want corerbd.FlattenMode
want types.FlattenMode
wantErr bool
}{
{
Expand All @@ -660,27 +661,27 @@ func Test_getFlattenMode(t *testing.T) {
ctx: context.TODO(),
parameters: map[string]string{},
},
want: corerbd.FlattenModeNever,
want: types.FlattenModeNever,
},
{
name: "flattenMode option set to never",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: string(corerbd.FlattenModeNever),
flattenModeKey: string(types.FlattenModeNever),
},
},
want: corerbd.FlattenModeNever,
want: types.FlattenModeNever,
},
{
name: "flattenMode option set to force",
args: args{
ctx: context.TODO(),
parameters: map[string]string{
flattenModeKey: string(corerbd.FlattenModeForce),
flattenModeKey: string(types.FlattenModeForce),
},
},
want: corerbd.FlattenModeForce,
want: types.FlattenModeForce,
},

{
Expand Down
16 changes: 14 additions & 2 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,12 +981,18 @@ func (cs *ControllerServer) DeleteVolume(
func cleanupRBDImage(ctx context.Context,
rbdVol *rbdVolume, cr *util.Credentials,
) (*csi.DeleteVolumeResponse, error) {
mirroringInfo, err := rbdVol.GetImageMirroringInfo()
info, err := rbdVol.GetMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Internal, err.Error())
}
mirroringInfo, ok := info.(*librbd.MirrorImageInfo)
if !ok {
log.ErrorLog(ctx, "failed to get mirroring info for volume %s", rbdVol.RbdImageName)

return nil, status.Error(codes.Internal, "failed to get mirroring info")
}
// Cleanup only omap data if the following condition is met
// Mirroring is enabled on the image
// Local image is secondary
Expand All @@ -1000,10 +1006,16 @@ func cleanupRBDImage(ctx context.Context,
// the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the OMAP, PVC and PV
// objects after failback operation.
localStatus, rErr := rbdVol.GetLocalState()
sts, rErr := rbdVol.GetLocalState()
if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error())
}
localStatus, ok := sts.(*librbd.SiteMirrorImageStatus)
if !ok {
log.ErrorLog(ctx, "failed to get local status for volume %s", rbdVol.RbdImageName)

return nil, status.Error(codes.Internal, "failed to get local status")
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
if err = undoVolReservation(ctx, rbdVol, cr); err != nil {
log.ErrorLog(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)",
Expand Down
67 changes: 34 additions & 33 deletions internal/rbd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,25 @@ import (
"fmt"
"time"

"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"

librbd "github.com/ceph/go-ceph/rbd"
)

// FlattenMode is used to indicate the flatten mode for an RBD image.
type FlattenMode string

const (
// FlattenModeNever indicates that the image should never be flattened.
FlattenModeNever FlattenMode = "never"
// FlattenModeForce indicates that the image with the parent must be flattened.
FlattenModeForce FlattenMode = "force"
)

// HandleParentImageExistence checks the image's parent.
// if the parent image does not exist and is not in trash, it returns nil.
// if the flattenMode is FlattenModeForce, it flattens the image itself.
// if the parent image is in trash, it returns an error.
// if the parent image exists and is not enabled for mirroring, it returns an error.
func (rv *rbdVolume) HandleParentImageExistence(
ctx context.Context,
flattenMode FlattenMode,
mode types.FlattenMode,
) error {
if rv.ParentName == "" && !rv.ParentInTrash {
return nil
}

if flattenMode == FlattenModeForce {
if mode == types.FlattenModeForce {
// Delete temp image that exists for volume datasource since
// it is no longer required when the live image is flattened.
err := rv.DeleteTempImage(ctx)
Expand All @@ -72,14 +62,19 @@ func (rv *rbdVolume) HandleParentImageExistence(
if err != nil {
return err
}
parentMirroringInfo, err := parent.GetImageMirroringInfo()
parentMirroringInfo, err := parent.GetMirroringInfo()
if err != nil {
return fmt.Errorf(
"failed to get mirroring info of parent %q of image %q: %w",
parent, rv, err)
}
parentMirrorInfo, ok := parentMirroringInfo.(*librbd.MirrorImageInfo)
if !ok {
return fmt.Errorf("failed to get mirroring info of parent %q of image %q: "+
"unexpected type %T", parent, rv, parentMirroringInfo)
}

if parentMirroringInfo.State != librbd.MirrorImageEnabled {
if parentMirrorInfo.State != librbd.MirrorImageEnabled {
return fmt.Errorf("%w: failed to enable mirroring on image %q: "+
"parent image %q is not enabled for mirroring",
ErrFailedPrecondition, rv, parent)
Expand All @@ -88,8 +83,11 @@ func (rv *rbdVolume) HandleParentImageExistence(
return nil
}

// EnableImageMirroring enables mirroring on an image.
func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error {
// check that rbdVolume implements the types.Mirror interface.
var _ types.Mirror = &rbdVolume{}

// EnableMirroring enables mirroring on an image.
func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
Expand All @@ -104,8 +102,8 @@ func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error {
return nil
}

// DisableImageMirroring disables mirroring on an image.
func (ri *rbdImage) DisableImageMirroring(force bool) error {
// DisableMirroring disables mirroring on an image.
func (ri *rbdImage) DisableMirroring(force bool) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
Expand All @@ -120,8 +118,8 @@ func (ri *rbdImage) DisableImageMirroring(force bool) error {
return nil
}

// GetImageMirroringInfo gets mirroring information of an image.
func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) {
// GetMirroringInfo gets mirroring information of an image.
func (ri *rbdImage) GetMirroringInfo() (interface{}, error) {
image, err := ri.open()
if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
Expand All @@ -136,8 +134,8 @@ func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) {
return info, nil
}

// PromoteImage promotes image to primary.
func (ri *rbdImage) PromoteImage(force bool) error {
// Promote promotes image to primary.
func (ri *rbdImage) Promote(force bool) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
Expand All @@ -151,10 +149,10 @@ func (ri *rbdImage) PromoteImage(force bool) error {
return nil
}

// ForcePromoteImage promotes image to primary with force option with 2 minutes
// ForcePromote promotes image to primary with force option with 2 minutes
// timeout. If there is no response within 2 minutes,the rbd CLI process will be
// killed and an error is returned.
func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error {
func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error {
promoteArgs := []string{
"mirror", "image", "promote",
rv.String(),
Expand All @@ -181,8 +179,8 @@ func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error {
return nil
}

// DemoteImage demotes image to secondary.
func (ri *rbdImage) DemoteImage() error {
// Demote demotes image to secondary.
func (ri *rbdImage) Demote() error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
Expand All @@ -196,8 +194,8 @@ func (ri *rbdImage) DemoteImage() error {
return nil
}

// resyncImage resync image to correct the split-brain.
func (ri *rbdImage) resyncImage() error {
// Resync resync image to correct the split-brain.
func (ri *rbdImage) Resync() error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
Expand All @@ -208,11 +206,14 @@ func (ri *rbdImage) resyncImage() error {
return fmt.Errorf("failed to resync image %q with error: %w", ri, err)
}

return nil
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
}

// GetImageMirroringStatus get the mirroring status of an image.
func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) {
// GetGlobalMirroringStatus get the mirroring status of an image.
func (ri *rbdImage) GetGlobalMirroringStatus() (interface{}, error) {
image, err := ri.open()
if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
Expand All @@ -227,7 +228,7 @@ func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus,
}

// GetLocalState returns the local state of the image.
func (ri *rbdImage) GetLocalState() (librbd.SiteMirrorImageStatus, error) {
func (ri *rbdImage) GetLocalState() (interface{}, error) {
localStatus := librbd.SiteMirrorImageStatus{}
image, err := ri.open()
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ func (ri *rbdImage) String() string {
return fmt.Sprintf("%s/%s", ri.Pool, ri.RbdImageName)
}

func (ri *rbdImage) GetPoolName() string {
return ri.Pool
}

// String returns the snap-spec (pool/{namespace/}image@snap) format of the snapshot.
func (rs *rbdSnapshot) String() string {
if rs.RadosNamespace != "" {
Expand Down Expand Up @@ -1594,9 +1598,9 @@ func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageO
return nil
}

// GetImageCreationTime returns the creation time of the image. if the image
// GetCreationTime returns the creation time of the image. if the image
// creation time is not set, it queries the image info and returns the creation time.
func (ri *rbdImage) GetImageCreationTime() (*timestamppb.Timestamp, error) {
func (ri *rbdImage) GetCreationTime() (*timestamppb.Timestamp, error) {
if ri.CreatedAt != nil {
return ri.CreatedAt, nil
}
Expand Down
35 changes: 17 additions & 18 deletions internal/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,11 @@ import (
"context"
"fmt"

"github.com/ceph/ceph-csi/internal/rbd/types"

librbd "github.com/ceph/go-ceph/rbd"
)

func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus) error {
if err := rv.resyncImage(); err != nil {
return fmt.Errorf("failed to resync image: %w", err)
}

// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
}

// repairResyncedImageID updates the existing image ID with new one.
func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) error {
// During resync operation the local image will get deleted and a new
Expand All @@ -54,11 +45,11 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro
return rv.repairImageID(ctx, j, true)
}

func (rv *rbdVolume) DisableVolumeReplication(
mirroringInfo *librbd.MirrorImageInfo,
func DisableVolumeReplication(mirror types.Mirror,
primary,
force bool,
) error {
if !mirroringInfo.Primary {
if !primary {
// Return success if the below condition is met
// Local image is secondary
// Local image is in up+replaying state
Expand All @@ -71,29 +62,37 @@ func (rv *rbdVolume) DisableVolumeReplication(
// disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation.
localStatus, rErr := rv.GetLocalState()
sts, rErr := mirror.GetLocalState()
if rErr != nil {
return fmt.Errorf("failed to get local state: %w", rErr)
}
localStatus, ok := sts.(librbd.SiteMirrorImageStatus)
if !ok {
return fmt.Errorf("failed to get local state: %w", ErrInvalidArgument)
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
return nil
}

return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
ErrInvalidArgument, localStatus.Up, localStatus.State)
}
err := rv.DisableImageMirroring(force)
err := mirror.DisableMirroring(force)
if err != nil {
return fmt.Errorf("failed to disable image mirroring: %w", err)
}
// the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not
mirroringInfo, err = rv.GetImageMirroringInfo()
info, err := mirror.GetMirroringInfo()
if err != nil {
return fmt.Errorf("failed to get mirroring info of image: %w", err)
}
mirroringInfo, ok := info.(*librbd.MirrorImageInfo)
if !ok {
return fmt.Errorf("failed to get mirroring info of image: unexpected type %T", info)
}
if mirroringInfo.State == librbd.MirrorImageDisabling {
return fmt.Errorf("%w: %q is in disabling state", ErrAborted, rv.VolID)
return fmt.Errorf("%w: image is in disabling state", ErrAborted)
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions internal/rbd/types/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,6 @@ type VolumeGroup interface {

// ListVolumes returns a slice with all Volumes in the VolumeGroup.
ListVolumes(ctx context.Context) ([]Volume, error)

Mirror
}
Loading

0 comments on commit 1899b64

Please sign in to comment.