Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rbd: add context to mirror interface
Browse files Browse the repository at this point in the history
adding required ctx to the mirror
interface as ctx is required for
the volumegroup operations.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
Madhu-1 committed Jul 30, 2024
1 parent 2eb1e5e commit 2709d3a
Showing 5 changed files with 38 additions and 37 deletions.
28 changes: 14 additions & 14 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
@@ -309,7 +309,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, err
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

@@ -322,7 +322,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,

return nil, getGRPCError(err)
}
err = mirror.EnableMirroring(mirroringMode)
err = mirror.EnableMirroring(ctx, mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())

@@ -383,7 +383,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
return nil, err
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

@@ -396,7 +396,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
case librbd.MirrorImageDisabling.String():
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled.String():
err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force)
err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force)
if err != nil {
return nil, getGRPCError(err)
}
@@ -454,7 +454,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

@@ -474,9 +474,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
if req.GetForce() {
// workaround for https://github.com/ceph/ceph-csi/issues/2736
// TODO: remove this workaround when the issue is fixed
err = mirror.ForcePromote(cr)
err = mirror.ForcePromote(ctx, cr)
} else {
err = mirror.Promote(req.GetForce())
err = mirror.Promote(ctx, req.GetForce())
}
if err != nil {
log.ErrorLog(ctx, err.Error())
@@ -563,7 +563,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

@@ -592,7 +592,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

err = mirror.Demote()
err = mirror.Demote(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

@@ -675,7 +675,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
// in case of Resync the image will get deleted and gets recreated and
// it takes time for this operation.
@@ -693,7 +693,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is in primary state")
}

sts, err := mirror.GetGlobalMirroringStatus()
sts, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
// the image gets recreated after issuing resync
if errors.Is(err, corerbd.ErrImageNotFound) {
@@ -764,7 +764,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())
if req.GetForce() && st.Equal(creationTime.AsTime()) {
err = mirror.Resync()
err = mirror.Resync(ctx)
if err != nil {
return nil, getGRPCError(err)
}
@@ -889,7 +889,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}

info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

@@ -905,7 +905,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is not in primary state")
}

mirrorStatus, err := mirror.GetGlobalMirroringStatus()
mirrorStatus, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
if errors.Is(err, corerbd.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error())
4 changes: 2 additions & 2 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
@@ -988,7 +988,7 @@ func (cs *ControllerServer) DeleteVolume(
func cleanupRBDImage(ctx context.Context,
rbdVol *rbdVolume, cr *util.Credentials,
) (*csi.DeleteVolumeResponse, error) {
info, err := rbdVol.GetMirroringInfo()
info, err := rbdVol.GetMirroringInfo(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())

@@ -1007,7 +1007,7 @@ func cleanupRBDImage(ctx context.Context,
// the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the OMAP, PVC and PV
// objects after failback operation.
sts, rErr := rbdVol.GetGlobalMirroringStatus()
sts, rErr := rbdVol.GetGlobalMirroringStatus(ctx)
if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error())
}
20 changes: 10 additions & 10 deletions internal/rbd/mirror.go
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ func (rv *rbdVolume) HandleParentImageExistence(
if err != nil {
return fmt.Errorf("failed to get parent image %s: %w", rv, err)
}
parentMirroringInfo, err := parent.GetMirroringInfo()
parentMirroringInfo, err := parent.GetMirroringInfo(ctx)
if err != nil {
return fmt.Errorf(
"failed to get mirroring info of parent %q of image %q: %w",
@@ -82,7 +82,7 @@ func (rv *rbdVolume) HandleParentImageExistence(
var _ types.Mirror = &rbdVolume{}

// EnableMirroring enables mirroring on an image.
func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error {
func (ri *rbdImage) EnableMirroring(_ context.Context, mode librbd.ImageMirrorMode) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@@ -98,7 +98,7 @@ func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error {
}

// DisableMirroring disables mirroring on an image.
func (ri *rbdImage) DisableMirroring(force bool) error {
func (ri *rbdImage) DisableMirroring(_ context.Context, force bool) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@@ -114,7 +114,7 @@ func (ri *rbdImage) DisableMirroring(force bool) error {
}

// GetMirroringInfo gets mirroring information of an image.
func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) {
func (ri *rbdImage) GetMirroringInfo(_ context.Context) (types.MirrorInfo, error) {
image, err := ri.open()
if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
@@ -130,7 +130,7 @@ func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) {
}

// Promote promotes image to primary.
func (ri *rbdImage) Promote(force bool) error {
func (ri *rbdImage) Promote(_ context.Context, force bool) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@@ -147,7 +147,7 @@ func (ri *rbdImage) Promote(force bool) error {
// ForcePromote promotes image to primary with force option with 2 minutes
// timeout. If there is no response within 2 minutes,the rbd CLI process will be
// killed and an error is returned.
func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error {
func (rv *rbdVolume) ForcePromote(ctx context.Context, cr *util.Credentials) error {
promoteArgs := []string{
"mirror", "image", "promote",
rv.String(),
@@ -157,7 +157,7 @@ func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error {
"--keyfile=" + cr.KeyFile,
}
_, stderr, err := util.ExecCommandWithTimeout(
context.TODO(),
ctx,
// 2 minutes timeout as the Replication RPC timeout is 2.5 minutes.
2*time.Minute,
"rbd",
@@ -175,7 +175,7 @@ func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error {
}

// Demote demotes image to secondary.
func (ri *rbdImage) Demote() error {
func (ri *rbdImage) Demote(_ context.Context) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@@ -190,7 +190,7 @@ func (ri *rbdImage) Demote() error {
}

// Resync resync image to correct the split-brain.
func (ri *rbdImage) Resync() error {
func (ri *rbdImage) Resync(_ context.Context) error {
image, err := ri.open()
if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@@ -208,7 +208,7 @@ func (ri *rbdImage) Resync() error {
}

// GetGlobalMirroringStatus get the mirroring status of an image.
func (ri *rbdImage) GetGlobalMirroringStatus() (types.GlobalStatus, error) {
func (ri *rbdImage) GetGlobalMirroringStatus(_ context.Context) (types.GlobalStatus, error) {
image, err := ri.open()
if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
7 changes: 4 additions & 3 deletions internal/rbd/replication.go
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro
}

func DisableVolumeReplication(mirror types.Mirror,
ctx context.Context,
primary,
force bool,
) error {
@@ -62,7 +63,7 @@ func DisableVolumeReplication(mirror types.Mirror,
// disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation.
sts, rErr := mirror.GetGlobalMirroringStatus()
sts, rErr := mirror.GetGlobalMirroringStatus(ctx)
if rErr != nil {
return fmt.Errorf("failed to get global state: %w", rErr)
}
@@ -78,13 +79,13 @@ func DisableVolumeReplication(mirror types.Mirror,
return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState())
}
err := mirror.DisableMirroring(force)
err := mirror.DisableMirroring(ctx, force)
if err != nil {
return fmt.Errorf("failed to disable image mirroring: %w", err)
}
// the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not
info, err := mirror.GetMirroringInfo()
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
return fmt.Errorf("failed to get mirroring info of image: %w", err)
}
16 changes: 8 additions & 8 deletions internal/rbd/types/mirror.go
Original file line number Diff line number Diff line change
@@ -39,21 +39,21 @@ const (
// Mirror is the interface for managing mirroring on an RBD image or a group.
type Mirror interface {
// EnableMirroring enables mirroring on the resource with the specified mode.
EnableMirroring(mode librbd.ImageMirrorMode) error
EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error
// DisableMirroring disables mirroring on the resource with the option to force the operation
DisableMirroring(force bool) error
DisableMirroring(ctx context.Context, force bool) error
// Promote promotes the resource to primary status with the option to force the operation
Promote(force bool) error
Promote(ctx context.Context, force bool) error
// ForcePromote promotes the resource to primary status with a timeout
ForcePromote(cr *util.Credentials) error
ForcePromote(ctx context.Context, cr *util.Credentials) error
// Demote demotes the resource to secondary status
Demote() error
Demote(ctx context.Context) error
// Resync resynchronizes the resource
Resync() error
Resync(ctx context.Context) error
// GetMirroringInfo returns the mirroring information of the resource
GetMirroringInfo() (MirrorInfo, error)
GetMirroringInfo(ctx context.Context) (MirrorInfo, error)
// GetMirroringInfo returns the mirroring information of the resource
GetGlobalMirroringStatus() (GlobalStatus, error)
GetGlobalMirroringStatus(ctx context.Context) (GlobalStatus, error)
// AddSnapshotScheduling adds a snapshot scheduling to the resource
AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error
}

0 comments on commit 2709d3a

Please sign in to comment.