diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 544907b4f497..df5c3732bcd4 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -27,7 +27,9 @@ import ( "time" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd" + "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -93,12 +95,17 @@ type ReplicationServer struct { *replication.UnimplementedControllerServer // Embed ControllerServer as it implements helper functions *corerbd.ControllerServer + // csiID is the unique ID for this CSI-driver deployment. + csiID string } // NewReplicationServer creates a new ReplicationServer which handles // the Replication Service requests from the CSI-Addons specification. -func NewReplicationServer(c *corerbd.ControllerServer) *ReplicationServer { - return &ReplicationServer{ControllerServer: c} +func NewReplicationServer(instanceID string, c *corerbd.ControllerServer) *ReplicationServer { + return &ReplicationServer{ + ControllerServer: c, + csiID: instanceID, + } } func (rs *ReplicationServer) RegisterService(server grpc.ServiceRegistrar) { @@ -124,18 +131,18 @@ func getForceOption(ctx context.Context, parameters map[string]string) (bool, er // getFlattenMode gets flatten mode from the input GRPC request parameters. // flattenMode is the key to check the mode in the parameters. -func getFlattenMode(ctx context.Context, parameters map[string]string) (corerbd.FlattenMode, error) { +func getFlattenMode(ctx context.Context, parameters map[string]string) (types.FlattenMode, error) { val, ok := parameters[flattenModeKey] if !ok { log.DebugLog(ctx, "%q is not set in parameters, setting to default (%v)", - flattenModeKey, corerbd.FlattenModeNever) + flattenModeKey, types.FlattenModeNever) - return corerbd.FlattenModeNever, nil + return types.FlattenModeNever, nil } - mode := corerbd.FlattenMode(val) + mode := types.FlattenMode(val) switch mode { - case corerbd.FlattenModeForce, corerbd.FlattenModeNever: + case types.FlattenModeForce, types.FlattenModeNever: return mode, nil } log.ErrorLog(ctx, "%q=%q is not supported", flattenModeKey, val) @@ -270,24 +277,27 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + // extract the mirroring mode mirroringMode, err := getMirroringMode(ctx, req.GetParameters()) if err != nil { @@ -299,21 +309,20 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { err = rbdVol.HandleParentImageExistence(ctx, flattenMode) if err != nil { log.ErrorLog(ctx, err.Error()) return nil, getGRPCError(err) } - err = rbdVol.EnableImageMirroring(mirroringMode) + err = mirror.EnableMirroring(mirroringMode) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -347,52 +356,54 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + // extract the force option force, err := getForceOption(ctx, req.GetParameters()) if err != nil { return nil, err } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - - switch mirroringInfo.State { + switch info.GetState() { // image is already in disabled state - case librbd.MirrorImageDisabled: + case librbd.MirrorImageDisabled.String(): // image mirroring is still disabling - case librbd.MirrorImageDisabling: + case librbd.MirrorImageDisabling.String(): return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) - case librbd.MirrorImageEnabled: - err = rbdVol.DisableVolumeReplication(mirroringInfo, force) + case librbd.MirrorImageEnabled.String(): + err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force) if err != nil { return nil, getGRPCError(err) } return &replication.DisableVolumeReplicationResponse{}, nil default: - return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State) + return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", info.GetState()) } return &replication.DisableVolumeReplicationResponse{}, nil @@ -422,48 +433,50 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Errorf( codes.InvalidArgument, - "mirroring is not enabled on %s, image is in %d Mode", - rbdVol.VolID, - mirroringInfo.State) + "mirroring is not enabled on %s, image is in %s Mode", + volumeID, + info.GetState()) } // promote secondary to primary - if !mirroringInfo.Primary { + if !info.IsPrimary() { if req.GetForce() { // workaround for https://github.com/ceph/ceph-csi/issues/2736 // TODO: remove this workaround when the issue is fixed - err = rbdVol.ForcePromoteImage(cr) + err = mirror.ForcePromote(cr) } else { - err = rbdVol.PromoteImage(req.GetForce()) + err = mirror.Promote(req.GetForce()) } if err != nil { log.ErrorLog(ctx, err.Error()) @@ -483,7 +496,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, interval, startTime := getSchedulingDetails(req.GetParameters()) if interval != admin.NoInterval { - err = rbdVol.AddSnapshotScheduling(interval, startTime) + err = mirror.AddSnapshotScheduling(interval, startTime) if err != nil { return nil, err } @@ -522,49 +535,51 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - creationTime, err := rbdVol.GetImageCreationTime() + creationTime, err := rbdVol.GetCreationTime() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Errorf( codes.InvalidArgument, - "mirroring is not enabled on %s, image is in %d Mode", - rbdVol.VolID, - mirroringInfo.State) + "mirroring is not enabled on %s, image is in %s Mode", + volumeID, + info.GetState()) } // demote image to secondary - if mirroringInfo.Primary { + if info.IsPrimary() { // store the image creation time for resync _, err = rbdVol.GetMetadata(imageCreationTimeKey) if err != nil && errors.Is(err, librbd.ErrNotFound) { @@ -577,7 +592,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - err = rbdVol.DemoteImage() + err = mirror.Demote() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -590,22 +605,22 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, // checkRemoteSiteStatus checks the state of the remote cluster. // It returns true if the state of the remote cluster is up and unknown. -func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirrorImageStatus) bool { +func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus) bool { ready := true found := false - for _, s := range mirrorStatus.SiteStatuses { + for _, s := range mirrorStatus { log.UsefulLog( ctx, "peer site mirrorUUID=%q, daemon up=%t, mirroring state=%q, description=%q and lastUpdate=%d", - s.MirrorUUID, - s.Up, - s.State, - s.Description, - s.LastUpdate) - if s.MirrorUUID != "" { + s.GetMirrorUUID(), + s.IsUP(), + s.GetState(), + s.GetDescription(), + s.GetLastUpdate()) + if s.GetMirrorUUID() != "" { found = true // If ready is already "false" do not flip it based on another remote peer status - if ready && (s.State != librbd.MirrorImageStatusStateUnknown || !s.Up) { + if ready && (s.GetState() != librbd.MirrorImageStatusStateUnknown.String() || !s.IsUP()) { ready = false } } @@ -639,26 +654,28 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { // in case of Resync the image will get deleted and gets recreated and // it takes time for this operation. @@ -667,22 +684,22 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Errorf(codes.Aborted, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled") } // return error if the image is still primary - if mirroringInfo.Primary { + if info.IsPrimary() { return nil, status.Error(codes.InvalidArgument, "image is in primary state") } - mirrorStatus, err := rbdVol.GetImageMirroringStatus() + sts, err := mirror.GetGlobalMirroringStatus() if err != nil { // the image gets recreated after issuing resync if errors.Is(err, corerbd.ErrImageNotFound) { // caller retries till RBD syncs an initial version of the image to // report its status in the resync call. Ideally, this line will not - // be executed as the error would get returned due to getImageMirroringInfo + // be executed as the error would get returned due to getMirroringInfo // failing to find an image above. return nil, status.Error(codes.Aborted, err.Error()) } @@ -692,22 +709,20 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } ready := false - localStatus, err := mirrorStatus.LocalStatus() + localStatus, err := sts.GetLocalSiteStatus() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, fmt.Errorf("failed to get local status: %w", err) } - // convert the last update time to UTC - lastUpdateTime := time.Unix(localStatus.LastUpdate, 0).UTC() log.UsefulLog( ctx, "local status: daemon up=%t, image mirroring state=%q, description=%q and lastUpdate=%s", - localStatus.Up, - localStatus.State, - localStatus.Description, - lastUpdateTime) + localStatus.IsUP(), + localStatus.GetState(), + localStatus.GetDescription(), + localStatus.GetLastUpdate()) // To recover from split brain (up+error) state the image need to be // demoted and requested for resync on site-a and then the image on site-b @@ -719,11 +734,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, // If the image state on both the sites are up+unknown consider that // complete data is synced as the last snapshot // gets exchanged between the clusters. - if localStatus.State == librbd.MirrorImageStatusStateUnknown && localStatus.Up { - ready = checkRemoteSiteStatus(ctx, mirrorStatus) + if localStatus.GetState() == librbd.MirrorImageStatusStateUnknown.String() && localStatus.IsUP() { + ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus()) } - creationTime, err := rbdVol.GetImageCreationTime() + creationTime, err := rbdVol.GetCreationTime() if err != nil { return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error()) } @@ -749,7 +764,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime()) if req.GetForce() && st.Equal(creationTime.AsTime()) { - err = rbdVol.ResyncVol(localStatus) + err = mirror.Resync() if err != nil { return nil, getGRPCError(err) } @@ -853,42 +868,44 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(rs.csiID, nil, req.GetSecrets()) + defer mgr.Destroy(ctx) + + rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror, err := rbdVol.ToMirror() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Aborted, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled") } // return error if the image is not in primary state - if !mirroringInfo.Primary { + if !info.IsPrimary() { return nil, status.Error(codes.InvalidArgument, "image is not in primary state") } - mirrorStatus, err := rbdVol.GetImageMirroringStatus() + mirrorStatus, err := mirror.GetGlobalMirroringStatus() if err != nil { if errors.Is(err, corerbd.ErrImageNotFound) { return nil, status.Error(codes.Aborted, err.Error()) @@ -898,14 +915,14 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - remoteStatus, err := RemoteStatus(ctx, mirrorStatus) + remoteStatus, err := mirrorStatus.GetRemoteSiteStatus(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Errorf(codes.Internal, "failed to get remote status: %v", err) } - description := remoteStatus.Description + description := remoteStatus.GetDescription() resp, err := getLastSyncInfo(ctx, description) if err != nil { if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) { @@ -919,36 +936,6 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return resp, nil } -// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses -// slice that corresponds to the remote site's status. If the remote status -// is not found than the error ErrNotExist will be returned. -func RemoteStatus(ctx context.Context, gmis *librbd.GlobalMirrorImageStatus) (librbd.SiteMirrorImageStatus, error) { - var ( - ss librbd.SiteMirrorImageStatus - err error = librbd.ErrNotExist - ) - - for i := range gmis.SiteStatuses { - log.DebugLog( - ctx, - "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", - gmis.SiteStatuses[i].MirrorUUID, - gmis.SiteStatuses[i].State, - gmis.SiteStatuses[i].Description, - gmis.SiteStatuses[i].LastUpdate, - gmis.SiteStatuses[i].Up) - - if gmis.SiteStatuses[i].MirrorUUID != "" { - ss = gmis.SiteStatuses[i] - err = nil - - break - } - } - - return ss, err -} - // This function gets the local snapshot time, last sync snapshot seconds // and last sync bytes from the description of localStatus and convert // it into required types. @@ -1015,12 +1002,12 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV return &response, nil } -func checkVolumeResyncStatus(ctx context.Context, localStatus librbd.SiteMirrorImageStatus) error { +func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) error { // we are considering local snapshot timestamp to check if the resync is // started or not, if we dont see local_snapshot_timestamp in the // description of localStatus, we are returning error. if we see the local // snapshot timestamp in the description we return resyncing started. - description := localStatus.Description + description := localStatus.GetDescription() resp, err := getLastSyncInfo(ctx, description) if err != nil { return fmt.Errorf("failed to get last sync info: %w", err) diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index c678c6377236..6da1b929eea7 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -26,6 +26,7 @@ import ( "time" corerbd "github.com/ceph/ceph-csi/internal/rbd" + "github.com/ceph/ceph-csi/internal/rbd/types" librbd "github.com/ceph/go-ceph/rbd" "github.com/ceph/go-ceph/rbd/admin" @@ -219,30 +220,36 @@ func TestCheckVolumeResyncStatus(t *testing.T) { t.Parallel() tests := []struct { name string - args librbd.SiteMirrorImageStatus + args corerbd.SiteMirrorImageStatus wantErr bool }{ { name: "test when local_snapshot_timestamp is non zero", - args: librbd.SiteMirrorImageStatus{ - //nolint:lll // sample output cannot be split into multiple lines. - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + //nolint:lll // sample output cannot be split into multiple lines. + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: false, }, { name: "test when local_snapshot_timestamp is zero", //nolint:lll // sample output cannot be split into multiple lines. - args: librbd.SiteMirrorImageStatus{ - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: true, }, { name: "test when local_snapshot_timestamp is not present", //nolint:lll // sample output cannot be split into multiple lines. - args: librbd.SiteMirrorImageStatus{ - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: true, }, @@ -261,17 +268,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { t.Parallel() tests := []struct { name string - args *librbd.GlobalMirrorImageStatus + args corerbd.GlobalMirrorStatus wantReady bool }{ { name: "Test a single peer in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -279,17 +288,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test a single peer in sync, including a local instance", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -297,17 +308,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test a multiple peers in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -315,19 +328,23 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test no remote peers", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{}, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{}, + }, }, wantReady: false, }, { name: "Test single peer not in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateReplaying, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateReplaying, + Up: true, + }, }, }, }, @@ -335,12 +352,14 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test single peer not up", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: false, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: false, + }, }, }, }, @@ -348,17 +367,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test multiple peers, when first peer is not in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateStoppingReplay, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateStoppingReplay, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -366,17 +387,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test multiple peers, when second peer is not up", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: false, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: false, + }, }, }, }, @@ -386,7 +409,7 @@ func TestCheckRemoteSiteStatus(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() - if ready := checkRemoteSiteStatus(context.TODO(), tt.args); ready != tt.wantReady { + if ready := checkRemoteSiteStatus(context.TODO(), tt.args.GetAllSitesStatus()); ready != tt.wantReady { t.Errorf("checkRemoteSiteStatus() ready = %v, expect ready = %v", ready, tt.wantReady) } }) @@ -651,7 +674,7 @@ func Test_getFlattenMode(t *testing.T) { tests := []struct { name string args args - want corerbd.FlattenMode + want types.FlattenMode wantErr bool }{ { @@ -660,27 +683,27 @@ func Test_getFlattenMode(t *testing.T) { ctx: context.TODO(), parameters: map[string]string{}, }, - want: corerbd.FlattenModeNever, + want: types.FlattenModeNever, }, { name: "flattenMode option set to never", args: args{ ctx: context.TODO(), parameters: map[string]string{ - flattenModeKey: string(corerbd.FlattenModeNever), + flattenModeKey: string(types.FlattenModeNever), }, }, - want: corerbd.FlattenModeNever, + want: types.FlattenModeNever, }, { name: "flattenMode option set to force", args: args{ ctx: context.TODO(), parameters: map[string]string{ - flattenModeKey: string(corerbd.FlattenModeForce), + flattenModeKey: string(types.FlattenModeForce), }, }, - want: corerbd.FlattenModeForce, + want: types.FlattenModeForce, }, { diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 2ad9fe85464d..cdf1443a5bbb 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -988,7 +988,7 @@ func (cs *ControllerServer) DeleteVolume( func cleanupRBDImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials, ) (*csi.DeleteVolumeResponse, error) { - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := rbdVol.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -998,7 +998,7 @@ func cleanupRBDImage(ctx context.Context, // Mirroring is enabled on the image // Local image is secondary // Local image is in up+replaying state - if mirroringInfo.State == librbd.MirrorImageEnabled && !mirroringInfo.Primary { + if info.GetState() == librbd.MirrorImageEnabled.String() && !info.IsPrimary() { // If the image is in a secondary state and its up+replaying means its // an healthy secondary and the image is primary somewhere in the // remote cluster and the local image is getting replayed. Delete the @@ -1007,11 +1007,18 @@ func cleanupRBDImage(ctx context.Context, // the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the OMAP, PVC and PV // objects after failback operation. - localStatus, rErr := rbdVol.GetLocalState() + sts, rErr := rbdVol.GetGlobalMirroringStatus() if rErr != nil { return nil, status.Error(codes.Internal, rErr.Error()) } - if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying { + + localStatus, rErr := sts.GetLocalSiteStatus() + if rErr != nil { + log.ErrorLog(ctx, "failed to get local status for volume %s: %w", rbdVol.RbdImageName, rErr) + + return nil, status.Error(codes.Internal, rErr.Error()) + } + if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() { if err = undoVolReservation(ctx, rbdVol, cr); err != nil { log.ErrorLog(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)", rbdVol.RequestName, rbdVol.RbdImageName, err) @@ -1023,8 +1030,8 @@ func cleanupRBDImage(ctx context.Context, } log.ErrorLog(ctx, "secondary image status is up=%t and state=%s", - localStatus.Up, - localStatus.State) + localStatus.IsUP(), + localStatus.GetState()) } inUse, err := rbdVol.isInUse() diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 979da3b1c5a7..fd0dda43aa97 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -219,7 +219,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { fcs := casrbd.NewFenceControllerServer() r.cas.RegisterService(fcs) - rcs := casrbd.NewReplicationServer(NewControllerServer(r.cd)) + rcs := casrbd.NewReplicationServer(rbd.CSIInstanceID, NewControllerServer(r.cd)) r.cas.RegisterService(rcs) vgcs := casrbd.NewVolumeGroupServer(conf.InstanceID) diff --git a/internal/rbd/group.go b/internal/rbd/group.go index 46c7739d7d5c..d0a390322200 100644 --- a/internal/rbd/group.go +++ b/internal/rbd/group.go @@ -77,3 +77,7 @@ func (rv *rbdVolume) RemoveFromGroup(ctx context.Context, vg types.VolumeGroup) return librbd.GroupImageRemove(ioctx, name, rv.ioctx, rv.RbdImageName) } + +func (rv *rbdVolume) ToMirror() (types.Mirror, error) { + return rv, nil +} diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 106a9eb77769..905db1c44ab6 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -20,21 +20,13 @@ import ( "fmt" "time" + "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" librbd "github.com/ceph/go-ceph/rbd" ) -// FlattenMode is used to indicate the flatten mode for an RBD image. -type FlattenMode string - -const ( - // FlattenModeNever indicates that the image should never be flattened. - FlattenModeNever FlattenMode = "never" - // FlattenModeForce indicates that the image with the parent must be flattened. - FlattenModeForce FlattenMode = "force" -) - // HandleParentImageExistence checks the image's parent. // if the parent image does not exist and is not in trash, it returns nil. // if the flattenMode is FlattenModeForce, it flattens the image itself. @@ -42,13 +34,12 @@ const ( // if the parent image exists and is not enabled for mirroring, it returns an error. func (rv *rbdVolume) HandleParentImageExistence( ctx context.Context, - flattenMode FlattenMode, + mode types.FlattenMode, ) error { if rv.ParentName == "" && !rv.ParentInTrash { return nil } - - if flattenMode == FlattenModeForce { + if mode == types.FlattenModeForce { // Delete temp image that exists for volume datasource since // it is no longer required when the live image is flattened. err := rv.DeleteTempImage(ctx) @@ -72,14 +63,13 @@ func (rv *rbdVolume) HandleParentImageExistence( if err != nil { return err } - parentMirroringInfo, err := parent.GetImageMirroringInfo() + parentMirroringInfo, err := parent.GetMirroringInfo() if err != nil { return fmt.Errorf( "failed to get mirroring info of parent %q of image %q: %w", parent, rv, err) } - - if parentMirroringInfo.State != librbd.MirrorImageEnabled { + if parentMirroringInfo.GetState() != librbd.MirrorImageEnabled.String() { return fmt.Errorf("%w: failed to enable mirroring on image %q: "+ "parent image %q is not enabled for mirroring", ErrFailedPrecondition, rv, parent) @@ -88,8 +78,11 @@ func (rv *rbdVolume) HandleParentImageExistence( return nil } -// EnableImageMirroring enables mirroring on an image. -func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { +// check that rbdVolume implements the types.Mirror interface. +var _ types.Mirror = &rbdVolume{} + +// EnableMirroring enables mirroring on an image. +func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -104,8 +97,8 @@ func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { return nil } -// DisableImageMirroring disables mirroring on an image. -func (ri *rbdImage) DisableImageMirroring(force bool) error { +// DisableMirroring disables mirroring on an image. +func (ri *rbdImage) DisableMirroring(force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -120,8 +113,8 @@ func (ri *rbdImage) DisableImageMirroring(force bool) error { return nil } -// GetImageMirroringInfo gets mirroring information of an image. -func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) { +// GetMirroringInfo gets mirroring information of an image. +func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -133,11 +126,11 @@ func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) { return nil, fmt.Errorf("failed to get mirroring info of %q with error: %w", ri, err) } - return info, nil + return ImageStatus{MirrorImageInfo: info}, nil } -// PromoteImage promotes image to primary. -func (ri *rbdImage) PromoteImage(force bool) error { +// Promote promotes image to primary. +func (ri *rbdImage) Promote(force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -151,10 +144,10 @@ func (ri *rbdImage) PromoteImage(force bool) error { return nil } -// ForcePromoteImage promotes image to primary with force option with 2 minutes +// ForcePromote promotes image to primary with force option with 2 minutes // timeout. If there is no response within 2 minutes,the rbd CLI process will be // killed and an error is returned. -func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error { +func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { promoteArgs := []string{ "mirror", "image", "promote", rv.String(), @@ -181,8 +174,8 @@ func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error { return nil } -// DemoteImage demotes image to secondary. -func (ri *rbdImage) DemoteImage() error { +// Demote demotes image to secondary. +func (ri *rbdImage) Demote() error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -196,8 +189,8 @@ func (ri *rbdImage) DemoteImage() error { return nil } -// resyncImage resync image to correct the split-brain. -func (ri *rbdImage) resyncImage() error { +// Resync resync image to correct the split-brain. +func (ri *rbdImage) Resync() error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -208,11 +201,14 @@ func (ri *rbdImage) resyncImage() error { return fmt.Errorf("failed to resync image %q with error: %w", ri, err) } - return nil + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) } -// GetImageMirroringStatus get the mirroring status of an image. -func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) { +// GetGlobalMirroringStatus get the mirroring status of an image. +func (ri *rbdImage) GetGlobalMirroringStatus() (types.GlobalStatus, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -223,26 +219,110 @@ func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, return nil, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err) } - return &statusInfo, nil + return GlobalMirrorStatus{GlobalMirrorImageStatus: statusInfo}, nil } -// GetLocalState returns the local state of the image. -func (ri *rbdImage) GetLocalState() (librbd.SiteMirrorImageStatus, error) { - localStatus := librbd.SiteMirrorImageStatus{} - image, err := ri.open() +// ImageStatus is a wrapper around librbd.MirrorImageInfo that contains the +// image mirror status. +type ImageStatus struct { + *librbd.MirrorImageInfo +} + +func (status ImageStatus) GetState() string { + return status.State.String() +} + +func (status ImageStatus) IsPrimary() bool { + return status.Primary +} + +// GlobalMirrorStatus is a wrapper around librbd.GlobalMirrorImageStatus that contains the +// global mirror image status. +type GlobalMirrorStatus struct { + librbd.GlobalMirrorImageStatus +} + +func (status GlobalMirrorStatus) GetState() string { + return status.GlobalMirrorImageStatus.Info.State.String() +} + +func (status GlobalMirrorStatus) IsPrimary() bool { + return status.GlobalMirrorImageStatus.Info.Primary +} + +func (status GlobalMirrorStatus) GetLocalSiteStatus() (types.SiteStatus, error) { + s, err := status.GlobalMirrorImageStatus.LocalStatus() if err != nil { - return localStatus, fmt.Errorf("failed to open image %q with error: %w", ri, err) + err = fmt.Errorf("failed to get local site status: %w", err) } - defer image.Close() - statusInfo, err := image.GetGlobalMirrorStatus() - if err != nil { - return localStatus, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err) + return SiteMirrorImageStatus{ + SiteMirrorImageStatus: s, + }, err +} + +func (status GlobalMirrorStatus) GetAllSitesStatus() []types.SiteStatus { + var siteStatuses []types.SiteStatus + for _, ss := range status.SiteStatuses { + siteStatuses = append(siteStatuses, SiteMirrorImageStatus{SiteMirrorImageStatus: ss}) } - localStatus, err = statusInfo.LocalStatus() - if err != nil { - return localStatus, fmt.Errorf("failed to get local status: %w", err) + + return siteStatuses +} + +// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses +// slice that corresponds to the remote site's status. If the remote status +// is not found than the error ErrNotExist will be returned. +func (status GlobalMirrorStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) { + var ( + ss librbd.SiteMirrorImageStatus + err error = librbd.ErrNotExist + ) + + for i := range status.SiteStatuses { + log.DebugLog( + ctx, + "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + status.SiteStatuses[i].MirrorUUID, + status.SiteStatuses[i].State, + status.SiteStatuses[i].Description, + status.SiteStatuses[i].LastUpdate, + status.SiteStatuses[i].Up) + + if status.SiteStatuses[i].MirrorUUID != "" { + ss = status.SiteStatuses[i] + err = nil + + break + } } - return localStatus, nil + return SiteMirrorImageStatus{SiteMirrorImageStatus: ss}, err +} + +// SiteMirrorImageStatus is a wrapper around librbd.SiteMirrorImageStatus that contains the +// site mirror image status. +type SiteMirrorImageStatus struct { + librbd.SiteMirrorImageStatus +} + +func (status SiteMirrorImageStatus) GetMirrorUUID() string { + return status.MirrorUUID +} + +func (status SiteMirrorImageStatus) GetState() string { + return status.State.String() +} + +func (status SiteMirrorImageStatus) GetDescription() string { + return status.Description +} + +func (status SiteMirrorImageStatus) IsUP() bool { + return status.Up +} + +func (status SiteMirrorImageStatus) GetLastUpdate() time.Time { + // convert the last update time to UTC + return time.Unix(status.LastUpdate, 0).UTC() } diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 884d66a2b2d4..28437a921346 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -413,6 +413,10 @@ func (ri *rbdImage) String() string { return fmt.Sprintf("%s/%s", ri.Pool, ri.RbdImageName) } +func (ri *rbdImage) GetPoolName() string { + return ri.Pool +} + // String returns the snap-spec (pool/{namespace/}image@snap) format of the snapshot. func (rs *rbdSnapshot) String() string { if rs.RadosNamespace != "" { @@ -1594,9 +1598,9 @@ func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageO return nil } -// GetImageCreationTime returns the creation time of the image. if the image +// GetCreationTime returns the creation time of the image. if the image // creation time is not set, it queries the image info and returns the creation time. -func (ri *rbdImage) GetImageCreationTime() (*timestamppb.Timestamp, error) { +func (ri *rbdImage) GetCreationTime() (*timestamppb.Timestamp, error) { if ri.CreatedAt != nil { return ri.CreatedAt, nil } diff --git a/internal/rbd/replication.go b/internal/rbd/replication.go index c6b4c55ddcc9..86c31cd85fe8 100644 --- a/internal/rbd/replication.go +++ b/internal/rbd/replication.go @@ -20,20 +20,11 @@ import ( "context" "fmt" + "github.com/ceph/ceph-csi/internal/rbd/types" + librbd "github.com/ceph/go-ceph/rbd" ) -func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus) error { - if err := rv.resyncImage(); err != nil { - return fmt.Errorf("failed to resync image: %w", err) - } - - // If we issued a resync, return a non-final error as image needs to be recreated - // locally. Caller retries till RBD syncs an initial version of the image to - // report its status in the resync request. - return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) -} - // repairResyncedImageID updates the existing image ID with new one. func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) error { // During resync operation the local image will get deleted and a new @@ -54,11 +45,11 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro return rv.repairImageID(ctx, j, true) } -func (rv *rbdVolume) DisableVolumeReplication( - mirroringInfo *librbd.MirrorImageInfo, +func DisableVolumeReplication(mirror types.Mirror, + primary, force bool, ) error { - if !mirroringInfo.Primary { + if !primary { // Return success if the below condition is met // Local image is secondary // Local image is in up+replaying state @@ -71,29 +62,35 @@ func (rv *rbdVolume) DisableVolumeReplication( // disabled the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the volume // replication Kubernetes artifacts after failback operation. - localStatus, rErr := rv.GetLocalState() + sts, rErr := mirror.GetGlobalMirroringStatus() if rErr != nil { - return fmt.Errorf("failed to get local state: %w", rErr) + return fmt.Errorf("failed to get global state: %w", rErr) + } + + localStatus, err := sts.GetLocalSiteStatus() + if err != nil { + return fmt.Errorf("failed to get local state: %w", ErrInvalidArgument) } - if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying { + if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() { return nil } return fmt.Errorf("%w: secondary image status is up=%t and state=%s", - ErrInvalidArgument, localStatus.Up, localStatus.State) + ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState()) } - err := rv.DisableImageMirroring(force) + err := mirror.DisableMirroring(force) if err != nil { return fmt.Errorf("failed to disable image mirroring: %w", err) } // the image state can be still disabling once we disable the mirroring // check the mirroring is disabled or not - mirroringInfo, err = rv.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { return fmt.Errorf("failed to get mirroring info of image: %w", err) } - if mirroringInfo.State == librbd.MirrorImageDisabling { - return fmt.Errorf("%w: %q is in disabling state", ErrAborted, rv.VolID) + + if info.GetState() == librbd.MirrorImageDisabling.String() { + return fmt.Errorf("%w: image is in disabling state", ErrAborted) } return nil diff --git a/internal/rbd/types/volume.go b/internal/rbd/types/volume.go index f6758478f764..7f534091f42e 100644 --- a/internal/rbd/types/volume.go +++ b/internal/rbd/types/volume.go @@ -59,4 +59,7 @@ type Volume interface { // if the parent image is in trash, it returns an error. // if the parent image exists and is not enabled for mirroring, it returns an error. HandleParentImageExistence(ctx context.Context, flattenMode FlattenMode) error + + // ToMirror converts the Volume to a Mirror. + ToMirror() (Mirror, error) }