From 2709d3ae661b3398bad1d751ec36d77cdb6cc3ff Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 30 Jul 2024 18:42:35 +0200 Subject: [PATCH] rbd: add context to mirror interface adding required ctx to the mirror interface as ctx is required for the volumegroup operations. Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 28 +++++++++++++------------- internal/rbd/controllerserver.go | 4 ++-- internal/rbd/mirror.go | 20 +++++++++--------- internal/rbd/replication.go | 7 ++++--- internal/rbd/types/mirror.go | 16 +++++++-------- 5 files changed, 38 insertions(+), 37 deletions(-) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index ec5a0e56074b..ff54dbf5741c 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -309,7 +309,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -322,7 +322,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, getGRPCError(err) } - err = mirror.EnableMirroring(mirroringMode) + err = mirror.EnableMirroring(ctx, mirroringMode) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -383,7 +383,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, return nil, err } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -396,7 +396,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, case librbd.MirrorImageDisabling.String(): return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) case librbd.MirrorImageEnabled.String(): - err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force) + err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force) if err != nil { return nil, getGRPCError(err) } @@ -454,7 +454,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -474,9 +474,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, if req.GetForce() { // workaround for https://github.com/ceph/ceph-csi/issues/2736 // TODO: remove this workaround when the issue is fixed - err = mirror.ForcePromote(cr) + err = mirror.ForcePromote(ctx, cr) } else { - err = mirror.Promote(req.GetForce()) + err = mirror.Promote(ctx, req.GetForce()) } if err != nil { log.ErrorLog(ctx, err.Error()) @@ -563,7 +563,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -592,7 +592,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - err = mirror.Demote() + err = mirror.Demote(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -675,7 +675,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { // in case of Resync the image will get deleted and gets recreated and // it takes time for this operation. @@ -693,7 +693,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Error(codes.InvalidArgument, "image is in primary state") } - sts, err := mirror.GetGlobalMirroringStatus() + sts, err := mirror.GetGlobalMirroringStatus(ctx) if err != nil { // the image gets recreated after issuing resync if errors.Is(err, corerbd.ErrImageNotFound) { @@ -764,7 +764,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime()) if req.GetForce() && st.Equal(creationTime.AsTime()) { - err = mirror.Resync() + err = mirror.Resync(ctx) if err != nil { return nil, getGRPCError(err) } @@ -889,7 +889,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -905,7 +905,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.InvalidArgument, "image is not in primary state") } - mirrorStatus, err := mirror.GetGlobalMirroringStatus() + mirrorStatus, err := mirror.GetGlobalMirroringStatus(ctx) if err != nil { if errors.Is(err, corerbd.ErrImageNotFound) { return nil, status.Error(codes.Aborted, err.Error()) diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index cdf1443a5bbb..56de60c58d01 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -988,7 +988,7 @@ func (cs *ControllerServer) DeleteVolume( func cleanupRBDImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials, ) (*csi.DeleteVolumeResponse, error) { - info, err := rbdVol.GetMirroringInfo() + info, err := rbdVol.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -1007,7 +1007,7 @@ func cleanupRBDImage(ctx context.Context, // the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the OMAP, PVC and PV // objects after failback operation. - sts, rErr := rbdVol.GetGlobalMirroringStatus() + sts, rErr := rbdVol.GetGlobalMirroringStatus(ctx) if rErr != nil { return nil, status.Error(codes.Internal, rErr.Error()) } diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 1e490a176cff..4b75b6e1d914 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -63,7 +63,7 @@ func (rv *rbdVolume) HandleParentImageExistence( if err != nil { return fmt.Errorf("failed to get parent image %s: %w", rv, err) } - parentMirroringInfo, err := parent.GetMirroringInfo() + parentMirroringInfo, err := parent.GetMirroringInfo(ctx) if err != nil { return fmt.Errorf( "failed to get mirroring info of parent %q of image %q: %w", @@ -82,7 +82,7 @@ func (rv *rbdVolume) HandleParentImageExistence( var _ types.Mirror = &rbdVolume{} // EnableMirroring enables mirroring on an image. -func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error { +func (ri *rbdImage) EnableMirroring(_ context.Context, mode librbd.ImageMirrorMode) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -98,7 +98,7 @@ func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error { } // DisableMirroring disables mirroring on an image. -func (ri *rbdImage) DisableMirroring(force bool) error { +func (ri *rbdImage) DisableMirroring(_ context.Context, force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -114,7 +114,7 @@ func (ri *rbdImage) DisableMirroring(force bool) error { } // GetMirroringInfo gets mirroring information of an image. -func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) { +func (ri *rbdImage) GetMirroringInfo(_ context.Context) (types.MirrorInfo, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -130,7 +130,7 @@ func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) { } // Promote promotes image to primary. -func (ri *rbdImage) Promote(force bool) error { +func (ri *rbdImage) Promote(_ context.Context, force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -147,7 +147,7 @@ func (ri *rbdImage) Promote(force bool) error { // ForcePromote promotes image to primary with force option with 2 minutes // timeout. If there is no response within 2 minutes,the rbd CLI process will be // killed and an error is returned. -func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { +func (rv *rbdVolume) ForcePromote(ctx context.Context, cr *util.Credentials) error { promoteArgs := []string{ "mirror", "image", "promote", rv.String(), @@ -157,7 +157,7 @@ func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { "--keyfile=" + cr.KeyFile, } _, stderr, err := util.ExecCommandWithTimeout( - context.TODO(), + ctx, // 2 minutes timeout as the Replication RPC timeout is 2.5 minutes. 2*time.Minute, "rbd", @@ -175,7 +175,7 @@ func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { } // Demote demotes image to secondary. -func (ri *rbdImage) Demote() error { +func (ri *rbdImage) Demote(_ context.Context) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -190,7 +190,7 @@ func (ri *rbdImage) Demote() error { } // Resync resync image to correct the split-brain. -func (ri *rbdImage) Resync() error { +func (ri *rbdImage) Resync(_ context.Context) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -208,7 +208,7 @@ func (ri *rbdImage) Resync() error { } // GetGlobalMirroringStatus get the mirroring status of an image. -func (ri *rbdImage) GetGlobalMirroringStatus() (types.GlobalStatus, error) { +func (ri *rbdImage) GetGlobalMirroringStatus(_ context.Context) (types.GlobalStatus, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) diff --git a/internal/rbd/replication.go b/internal/rbd/replication.go index 86c31cd85fe8..badcb12ce62b 100644 --- a/internal/rbd/replication.go +++ b/internal/rbd/replication.go @@ -46,6 +46,7 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro } func DisableVolumeReplication(mirror types.Mirror, + ctx context.Context, primary, force bool, ) error { @@ -62,7 +63,7 @@ func DisableVolumeReplication(mirror types.Mirror, // disabled the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the volume // replication Kubernetes artifacts after failback operation. - sts, rErr := mirror.GetGlobalMirroringStatus() + sts, rErr := mirror.GetGlobalMirroringStatus(ctx) if rErr != nil { return fmt.Errorf("failed to get global state: %w", rErr) } @@ -78,13 +79,13 @@ func DisableVolumeReplication(mirror types.Mirror, return fmt.Errorf("%w: secondary image status is up=%t and state=%s", ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState()) } - err := mirror.DisableMirroring(force) + err := mirror.DisableMirroring(ctx, force) if err != nil { return fmt.Errorf("failed to disable image mirroring: %w", err) } // the image state can be still disabling once we disable the mirroring // check the mirroring is disabled or not - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { return fmt.Errorf("failed to get mirroring info of image: %w", err) } diff --git a/internal/rbd/types/mirror.go b/internal/rbd/types/mirror.go index 131fad8441a8..12c0bffdfe03 100644 --- a/internal/rbd/types/mirror.go +++ b/internal/rbd/types/mirror.go @@ -39,21 +39,21 @@ const ( // Mirror is the interface for managing mirroring on an RBD image or a group. type Mirror interface { // EnableMirroring enables mirroring on the resource with the specified mode. - EnableMirroring(mode librbd.ImageMirrorMode) error + EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error // DisableMirroring disables mirroring on the resource with the option to force the operation - DisableMirroring(force bool) error + DisableMirroring(ctx context.Context, force bool) error // Promote promotes the resource to primary status with the option to force the operation - Promote(force bool) error + Promote(ctx context.Context, force bool) error // ForcePromote promotes the resource to primary status with a timeout - ForcePromote(cr *util.Credentials) error + ForcePromote(ctx context.Context, cr *util.Credentials) error // Demote demotes the resource to secondary status - Demote() error + Demote(ctx context.Context) error // Resync resynchronizes the resource - Resync() error + Resync(ctx context.Context) error // GetMirroringInfo returns the mirroring information of the resource - GetMirroringInfo() (MirrorInfo, error) + GetMirroringInfo(ctx context.Context) (MirrorInfo, error) // GetMirroringInfo returns the mirroring information of the resource - GetGlobalMirroringStatus() (GlobalStatus, error) + GetGlobalMirroringStatus(ctx context.Context) (GlobalStatus, error) // AddSnapshotScheduling adds a snapshot scheduling to the resource AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error }