From 9575e5ace800c7ea46e62ca0eaded7cf99ac9b69 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Thu, 24 Oct 2024 10:27:10 +0200 Subject: [PATCH 1/6] rbd: implement volume group using go-ceph This adds the required functionality to call the go-ceph API's for the rbd volume group. Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 17 +- internal/rbd/group/group_mirror.go | 328 +++++++++++++++++++++++++ 2 files changed, 338 insertions(+), 7 deletions(-) create mode 100644 internal/rbd/group/group_mirror.go diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index be0864b6061..925139f7f09 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -29,11 +29,13 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd" + rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" "github.com/csi-addons/spec/lib/go/replication" "google.golang.org/grpc" @@ -785,13 +787,14 @@ func getGRPCError(err error) error { } errorStatusMap := map[error]codes.Code{ - corerbd.ErrImageNotFound: codes.NotFound, - util.ErrPoolNotFound: codes.NotFound, - corerbd.ErrInvalidArgument: codes.InvalidArgument, - corerbd.ErrFlattenInProgress: codes.Aborted, - corerbd.ErrAborted: codes.Aborted, - corerbd.ErrFailedPrecondition: codes.FailedPrecondition, - corerbd.ErrUnavailable: codes.Unavailable, + corerbd.ErrImageNotFound: codes.NotFound, + util.ErrPoolNotFound: codes.NotFound, + corerbd.ErrInvalidArgument: codes.InvalidArgument, + corerbd.ErrFlattenInProgress: codes.Aborted, + corerbd.ErrAborted: codes.Aborted, + corerbd.ErrFailedPrecondition: codes.FailedPrecondition, + corerbd.ErrUnavailable: codes.Unavailable, + rbd_group.ErrRBDGroupUnAvailable: codes.Unavailable, } for e, code := range errorStatusMap { diff --git a/internal/rbd/group/group_mirror.go b/internal/rbd/group/group_mirror.go new file mode 100644 index 00000000000..5d68597e756 --- /dev/null +++ b/internal/rbd/group/group_mirror.go @@ -0,0 +1,328 @@ +/* +Copyright 2024 The Ceph-CSI Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package group + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ceph/go-ceph/rados" + librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" + + "github.com/ceph/ceph-csi/internal/rbd/types" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" +) + +var ErrRBDGroupUnAvailable = errors.New("RBD group is unavailable") + +type volumeGroupMirror struct { + *volumeGroup +} + +func (vg volumeGroupMirror) EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupEnable(ioctx, name, mode) + if err != nil { + return fmt.Errorf("failed to enable mirroring on volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "mirroring is enabled on the volume group %q", vg) + + return nil +} + +func (vg volumeGroupMirror) DisableMirroring(ctx context.Context, force bool) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupDisable(ioctx, name, force) + if err != nil && !errors.Is(rados.ErrNotFound, err) { + return fmt.Errorf("failed to disable mirroring on volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "mirroring is disabled on the volume group %q", vg) + + return nil +} + +func (vg volumeGroupMirror) Promote(ctx context.Context, force bool) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupPromote(ioctx, name, force) + if err != nil { + return fmt.Errorf("failed to promote volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "volume group %q has been promoted", vg) + + return nil +} + +func (vg volumeGroupMirror) ForcePromote(ctx context.Context, cr *util.Credentials) error { + promoteArgs := []string{ + "mirror", "group", "promote", + vg.String(), + "--force", + "--id", cr.ID, + "-m", vg.monitors, + "--keyfile=" + cr.KeyFile, + } + _, stderr, err := util.ExecCommandWithTimeout( + ctx, + // 2 minutes timeout as the Replication RPC timeout is 2.5 minutes. + 2*time.Minute, + "rbd", + promoteArgs..., + ) + if err != nil { + return fmt.Errorf("failed to promote group %q with error: %w", vg, err) + } + + if stderr != "" { + return fmt.Errorf("failed to promote group %q with stderror: %s", vg, stderr) + } + + log.DebugLog(ctx, "volume group %q has been force promoted", vg) + + return nil +} + +func (vg volumeGroupMirror) Demote(ctx context.Context) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupDemote(ioctx, name) + if err != nil { + return fmt.Errorf("failed to demote volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "volume group %q has been demoted", vg) + + return nil +} + +func (vg volumeGroupMirror) Resync(ctx context.Context) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupResync(ioctx, name) + if err != nil { + return fmt.Errorf("failed to resync volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "issued resync on volume group %q", vg) + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrRBDGroupUnAvailable) +} + +func (vg volumeGroupMirror) GetMirroringInfo(ctx context.Context) (types.MirrorInfo, error) { + name, err := vg.GetName(ctx) + if err != nil { + return nil, err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return nil, err + } + + info, err := librbd.GetMirrorGroupInfo(ioctx, name) + if err != nil { + return nil, fmt.Errorf("failed to get volume group mirroring info %q: %w", vg, err) + } + + return &groupInfo{MirrorGroupInfo: info}, nil +} + +func (vg volumeGroupMirror) GetGlobalMirroringStatus(ctx context.Context) (types.GlobalStatus, error) { + name, err := vg.GetName(ctx) + if err != nil { + return nil, err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return nil, err + } + statusInfo, err := librbd.GetGlobalMirrorGroupStatus(ioctx, name) + if err != nil { + return nil, fmt.Errorf("failed to get volume group mirroring status %q: %w", vg, err) + } + + return globalMirrorGroupStatus{GlobalMirrorGroupStatus: &statusInfo}, nil +} + +func (vg volumeGroupMirror) AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error { + ls := admin.NewLevelSpec(vg.pool, vg.namespace, "") + ra, err := vg.conn.GetRBDAdmin() + if err != nil { + return err + } + adminConn := ra.MirrorSnashotSchedule() + err = adminConn.Add(ls, interval, startTime) + if err != nil { + return err + } + + return nil +} + +// groupInfo is a wrapper around librbd.MirrorGroupInfo that contains the +// group mirror info. +type groupInfo struct { + *librbd.MirrorGroupInfo +} + +func (info *groupInfo) GetState() string { + return info.State.String() +} + +func (info *groupInfo) IsPrimary() bool { + return info.Primary +} + +// globalMirrorGroupStatus is a wrapper around librbd.GlobalGroupMirrorImageStatus that contains the +// global mirror group status. +type globalMirrorGroupStatus struct { + *librbd.GlobalMirrorGroupStatus +} + +func (status globalMirrorGroupStatus) GetState() string { + return status.GlobalMirrorGroupStatus.Info.State.String() +} + +func (status globalMirrorGroupStatus) IsPrimary() bool { + return status.GlobalMirrorGroupStatus.Info.Primary +} + +func (status globalMirrorGroupStatus) GetLocalSiteStatus() (types.SiteStatus, error) { + s, err := status.GlobalMirrorGroupStatus.LocalStatus() + if err != nil { + err = fmt.Errorf("failed to get local site status: %w", err) + } + + return siteMirrorGroupStatus{ + SiteMirrorGroupStatus: &s, + }, err +} + +func (status globalMirrorGroupStatus) GetAllSitesStatus() []types.SiteStatus { + var siteStatuses []types.SiteStatus + for i := range status.SiteStatuses { + siteStatuses = append(siteStatuses, siteMirrorGroupStatus{SiteMirrorGroupStatus: &status.SiteStatuses[i]}) + } + + return siteStatuses +} + +// RemoteStatus returns one SiteMirrorGroupStatus item from the SiteStatuses +// slice that corresponds to the remote site's status. If the remote status +// is not found than the error ErrNotExist will be returned. +func (status globalMirrorGroupStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) { + var ( + ss librbd.SiteMirrorGroupStatus + err error = librbd.ErrNotExist + ) + + for i := range status.SiteStatuses { + log.DebugLog( + ctx, + "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + status.SiteStatuses[i].MirrorUUID, + status.SiteStatuses[i].State, + status.SiteStatuses[i].Description, + status.SiteStatuses[i].LastUpdate, + status.SiteStatuses[i].Up) + + if status.SiteStatuses[i].MirrorUUID != "" { + ss = status.SiteStatuses[i] + err = nil + + break + } + } + + return siteMirrorGroupStatus{SiteMirrorGroupStatus: &ss}, err +} + +// siteMirrorGroupStatus is a wrapper around librbd.SiteMirrorGroupStatus that contains the +// site mirror group status. +type siteMirrorGroupStatus struct { + *librbd.SiteMirrorGroupStatus +} + +func (status siteMirrorGroupStatus) GetMirrorUUID() string { + return status.MirrorUUID +} + +func (status siteMirrorGroupStatus) GetState() string { + return status.State.String() +} + +func (status siteMirrorGroupStatus) GetDescription() string { + return status.Description +} + +func (status siteMirrorGroupStatus) IsUP() bool { + return status.Up +} + +func (status siteMirrorGroupStatus) GetLastUpdate() time.Time { + // convert the last update time to UTC + return time.Unix(status.LastUpdate, 0).UTC() +} From 3afb22c8ac60a565725c209bf7cfbbd9f741dd89 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 19 Nov 2024 11:26:00 +0100 Subject: [PATCH 2/6] rbd: implement GetMirrorSource in manager implementing GetMirrorSource in manager to return volume or the volumegroup based on the replication source, if replication source is nil return the volume details for backward compatibility. Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 293 +++++++++++++------------ internal/csi-addons/rbd/volumegroup.go | 12 +- internal/rbd/group/volume_group.go | 4 + internal/rbd/manager.go | 78 +++++++ internal/rbd/types/group.go | 3 + internal/rbd/types/manager.go | 6 + 6 files changed, 240 insertions(+), 156 deletions(-) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 925139f7f09..bd9151d8e61 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -257,9 +257,9 @@ func validateSchedulingInterval(interval string) error { func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, req *replication.EnableVolumeReplicationRequest, ) (*replication.EnableVolumeReplicationResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -272,24 +272,23 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) // extract the mirroring mode mirroringMode, err := getMirroringMode(ctx, req.GetParameters()) @@ -309,17 +308,18 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } if info.GetState() != librbd.MirrorImageEnabled.String() { - err = rbdVol.HandleParentImageExistence(ctx, flattenMode) - if err != nil { - log.ErrorLog(ctx, err.Error()) - - return nil, getGRPCError(err) - } - err = mirror.EnableMirroring(ctx, mirroringMode) - if err != nil { - log.ErrorLog(ctx, err.Error()) + for _, rbdVol := range volumes { + err = rbdVol.HandleParentImageExistence(ctx, flattenMode) + if err != nil { + err = fmt.Errorf("failed to handle parent image for volume group %q: %w", mirror, err) + return nil, getGRPCError(err) + } + err = mirror.EnableMirroring(ctx, mirroringMode) + if err != nil { + log.ErrorLog(ctx, err.Error()) - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } } } @@ -332,9 +332,9 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, req *replication.DisableVolumeReplicationRequest, ) (*replication.DisableVolumeReplicationResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -342,24 +342,23 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) // extract the force option force, err := getForceOption(ctx, req.GetParameters()) @@ -378,7 +377,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, case librbd.MirrorImageDisabled.String(): // image mirroring is still disabling case librbd.MirrorImageDisabling.String(): - return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) + return nil, status.Errorf(codes.Aborted, "%s is in disabling state", reqID) case librbd.MirrorImageEnabled.String(): err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force) if err != nil { @@ -400,9 +399,9 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) PromoteVolume(ctx context.Context, req *replication.PromoteVolumeRequest, ) (*replication.PromoteVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -410,24 +409,23 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -440,7 +438,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, return nil, status.Errorf( codes.InvalidArgument, "mirroring is not enabled on %s, image is in %s Mode", - volumeID, + reqID, info.GetState()) } @@ -477,10 +475,10 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } log.DebugLog( ctx, - "Added scheduling at interval %s, start time %s for volume %s", + "Added scheduling at interval %s, start time %s for Id %s", interval, startTime, - rbdVol) + reqID) } return &replication.PromoteVolumeResponse{}, nil @@ -493,9 +491,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, func (rs *ReplicationServer) DemoteVolume(ctx context.Context, req *replication.DemoteVolumeRequest, ) (*replication.DemoteVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -503,31 +501,23 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) - if err != nil { - return nil, getGRPCError(err) - } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - creationTime, err := rbdVol.GetCreationTime(ctx) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { - log.ErrorLog(ctx, err.Error()) + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) - return nil, status.Error(codes.Internal, err.Error()) + return nil, getGRPCError(err) } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -540,24 +530,33 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Errorf( codes.InvalidArgument, "mirroring is not enabled on %s, image is in %s Mode", - volumeID, + reqID, info.GetState()) } // demote image to secondary if info.IsPrimary() { - // store the image creation time for resync - _, err = rbdVol.GetMetadata(imageCreationTimeKey) - if err != nil && errors.Is(err, librbd.ErrNotFound) { - log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, rbdVol) - err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime)) - } - if err != nil { - log.ErrorLog(ctx, err.Error()) + for _, vol := range volumes { + // store the image creation time for resync + creationTime, cErr := vol.GetCreationTime(ctx) + if cErr != nil { + log.ErrorLog(ctx, cErr.Error()) - return nil, status.Error(codes.Internal, err.Error()) - } + return nil, status.Error(codes.Internal, cErr.Error()) + } + // store the image creation time for resync + _, err = vol.GetMetadata(imageCreationTimeKey) + if err != nil && errors.Is(err, librbd.ErrNotFound) { + log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, vol) + err = vol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime)) + } + if err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) + } + } err = mirror.Demote(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -604,9 +603,9 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus) func (rs *ReplicationServer) ResyncVolume(ctx context.Context, req *replication.ResyncVolumeRequest, ) (*replication.ResyncVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -614,23 +613,23 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } defer rs.VolumeLocks.Release(volumeID) + mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -695,35 +694,40 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus()) } - creationTime, err := rbdVol.GetCreationTime(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error()) - } - - // image creation time is stored in the image metadata. it looks like - // `"seconds:1692879841 nanos:631526669"` - // If the image gets resynced the local image creation time will be - // lost, if the keys is not present in the image metadata then we can - // assume that the image is already resynced. - savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey) - if err != nil && !errors.Is(err, librbd.ErrNotFound) { - return nil, status.Errorf(codes.Internal, - "failed to get %s key from image metadata for %s: %s", - imageCreationTimeKey, - rbdVol, - err.Error()) - } + for _, vol := range volumes { + creationTime, tErr := vol.GetCreationTime(ctx) + if tErr != nil { + return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", vol, tErr.Error()) + } - if savedImageTime != "" { - st, sErr := timestampFromString(savedImageTime) - if sErr != nil { - return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error()) + // image creation time is stored in the image metadata. it looks like + // `"seconds:1692879841 nanos:631526669"` + // If the image gets resynced the local image creation time will be + // lost, if the keys is not present in the image metadata then we can + // assume that the image is already resynced. + var savedImageTime string + savedImageTime, err = vol.GetMetadata(imageCreationTimeKey) + if err != nil && !errors.Is(err, librbd.ErrNotFound) { + return nil, status.Errorf(codes.Internal, + "failed to get %s key from image metadata for %s: %s", + imageCreationTimeKey, + vol, + err.Error()) } - log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime) - if req.GetForce() && st.Equal(*creationTime) { - err = mirror.Resync(ctx) - if err != nil { - return nil, getGRPCError(err) + + if savedImageTime != "" { + st, sErr := timestampFromString(savedImageTime) + if sErr != nil { + return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error()) + } + log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", vol, st, creationTime) + if req.GetForce() && st.Equal(*creationTime) { + err = mirror.Resync(ctx) + if err != nil { + return nil, getGRPCError(err) + } + // Break the loop as we need to issue resync only once for the image or for the group. + break } } } @@ -735,9 +739,12 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } } - err = rbdVol.RepairResyncedImageID(ctx, ready) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) + // update imageID for all the volumes + for _, vol := range volumes { + err = vol.RepairResyncedImageID(ctx, ready) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) + } } resp := &replication.ResyncVolumeResponse{ @@ -812,9 +819,9 @@ func getGRPCError(err error) error { func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, req *replication.GetVolumeReplicationInfoRequest, ) (*replication.GetVolumeReplicationInfoResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -824,36 +831,23 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) + mgr := rbd.NewManager(rs.driverInstance, nil, req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) - if err != nil { - log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err) - - switch { - case errors.Is(err, corerbd.ErrImageNotFound): - err = status.Error(codes.NotFound, err.Error()) - case errors.Is(err, util.ErrPoolNotFound): - err = status.Error(codes.NotFound, err.Error()) - default: - err = status.Error(codes.Internal, err.Error()) - } - - return nil, err - } - mirror, err := rbdVol.ToMirror() + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { - log.ErrorLog(ctx, "failed to convert volume %q to mirror type: %v", rbdVol, err) + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) - return nil, status.Error(codes.Internal, err.Error()) + return nil, getGRPCError(err) } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -990,3 +984,10 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) return nil } + +// destoryVolumes destroys the volume connections. +func destoryVolumes(ctx context.Context, volumes []types.Volume) { + for _, vol := range volumes { + vol.Destroy(ctx) + } +} diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index 4727e548b80..dd349ac8af0 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -90,11 +90,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( // resolve all volumes volumes := make([]types.Volume, len(req.GetVolumeIds())) - defer func() { - for _, vol := range volumes { - vol.Destroy(ctx) - } - }() + defer destoryVolumes(ctx, volumes) for i, id := range req.GetVolumeIds() { vol, err := mgr.GetVolumeByID(ctx, id) if err != nil { @@ -356,11 +352,7 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership( // resolve all volumes volumes := make([]types.Volume, len(toAdd)) - defer func() { - for _, vol := range volumes { - vol.Destroy(ctx) - } - }() + defer destoryVolumes(ctx, volumes) for i, id := range toAdd { var vol types.Volume vol, err = mgr.GetVolumeByID(ctx, id) diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index e8414f3ddf9..fa9159bdd8d 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -415,3 +415,7 @@ func (vg *volumeGroup) CreateSnapshots( return snapshots, nil } + +func (vg *volumeGroup) ToMirror() (types.Mirror, error) { + return volumeGroupMirror{vg}, nil +} diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index 61fcfdcaadf..53385aee8d7 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" + "github.com/csi-addons/spec/lib/go/replication" + "github.com/ceph/ceph-csi/internal/journal" rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" @@ -504,6 +506,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot( return vgs, nil } +<<<<<<< HEAD // RegenerateVolumeGroupJournal regenerate the omap data for the volume group. // This performs the following operations: // - extracts clusterID and Mons from the cluster mapping @@ -633,4 +636,79 @@ func (mgr *rbdManager) RegenerateVolumeGroupJournal( groupHandle, vgName, requestName) return groupHandle, nil +======= +func (mgr *rbdManager) GetMirrorSource(ctx context.Context, reqID string, + rep *replication.ReplicationSource, +) ([]types.Volume, types.Mirror, error) { + switch { + // Backward compatibility: if rep is nil, we assume that the sidecar is still old and + // setting only volumeId not the replication source. + case rep == nil || rep.GetVolume() != nil: + rbdVol, err := mgr.GetVolumeByID(ctx, reqID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume by id %q: %w", reqID, err) + } + defer func() { + if err != nil { + rbdVol.Destroy(ctx) + } + }() + var mir types.Mirror + mir, err = rbdVol.ToMirror() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert volume %s to mirror: %w", rbdVol, err) + } + + return []types.Volume{rbdVol}, mir, nil + case rep.GetVolumegroup() != nil: + rbdGroup, err := mgr.GetVolumeGroupByID(ctx, reqID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume group by id %q: %w", reqID, err) + } + defer func() { + if err != nil { + rbdGroup.Destroy(ctx) + } + }() + var mir types.Mirror + mir, err = rbdGroup.ToMirror() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert volume group %s to mirror: %w", rbdGroup, err) + } + var vols []types.Volume + vols, err = rbdGroup.ListVolumes(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to list volumes in volume group %q: %w", rbdGroup, err) + } + // Get all the volume with connection and return it + volumes := make([]types.Volume, len(vols)) + // Destroy connections if there is any error + defer func() { + if err != nil { + for _, vol := range vols { + vol.Destroy(ctx) + } + } + }() + + for i, vol := range vols { + var id string + id, err = vol.GetID(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to get id for volume %q in group %q: %w", vol, rbdGroup, err) + } + var v types.Volume + v, err = mgr.GetVolumeByID(ctx, id) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume by id %q in group %q: %w", id, rbdGroup, err) + } + volumes[i] = v + } + + return volumes, mir, nil + + default: + return nil, nil, errors.New("replication source is not set") + } +>>>>>>> 71b235ff1 (rbd: implement GetMirrorSource in manager) } diff --git a/internal/rbd/types/group.go b/internal/rbd/types/group.go index 08183f9fb63..f9b9b8f3782 100644 --- a/internal/rbd/types/group.go +++ b/internal/rbd/types/group.go @@ -73,4 +73,7 @@ type VolumeGroup interface { // The Snapshots are crash consistent, and created as a consistency // group. CreateSnapshots(ctx context.Context, cr *util.Credentials, name string) ([]Snapshot, error) + + // ToMirror converts the VolumeGroup to a Mirror. + ToMirror() (Mirror, error) } diff --git a/internal/rbd/types/manager.go b/internal/rbd/types/manager.go index 458bc93dc0d..ca3bf63a456 100644 --- a/internal/rbd/types/manager.go +++ b/internal/rbd/types/manager.go @@ -18,6 +18,8 @@ package types import ( "context" + + "github.com/csi-addons/spec/lib/go/replication" ) // VolumeResolver can be used to construct a Volume from a CSI VolumeId. @@ -71,4 +73,8 @@ type Manager interface { // RegenerateVolumeGroupJournal regenerate the omap data for the volume group. // returns the volume group handle RegenerateVolumeGroupJournal(ctx context.Context, groupID, requestName string, volumeIds []string) (string, error) + + // GetMirrorSource returns the source of the mirror for the given volume or group. + GetMirrorSource(ctx context.Context, volumeID string, + rep *replication.ReplicationSource) ([]Volume, Mirror, error) } From a5c129dbe97a2ea0804787b7be0b1fdb06c44af7 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Thu, 24 Oct 2024 10:45:48 +0200 Subject: [PATCH 3/6] ci: remove this commit later This is temporary test only commit and not suitable for review. Signed-off-by: Madhu Rajanna --- build.env | 2 +- scripts/Dockerfile.test | 2 +- vendor/github.com/ceph/go-ceph/rbd/group.go | 9 +- .../ceph/go-ceph/rbd/mirror_group.go | 332 ++++++++++++++++++ 4 files changed, 340 insertions(+), 5 deletions(-) create mode 100644 vendor/github.com/ceph/go-ceph/rbd/mirror_group.go diff --git a/build.env b/build.env index f25018ad303..a2acc5663b0 100644 --- a/build.env +++ b/build.env @@ -15,7 +15,7 @@ CSI_IMAGE_VERSION=canary CSI_UPGRADE_VERSION=v3.12.1 # Ceph version to use -BASE_IMAGE=quay.io/ceph/ceph:v19 +BASE_IMAGE=quay.ceph.io/ceph-ci/ceph:wip-pkalever-rbd-group-snap-mirror CEPH_VERSION=squid # standard Golang options diff --git a/scripts/Dockerfile.test b/scripts/Dockerfile.test index def6a3325eb..2ed34367faa 100644 --- a/scripts/Dockerfile.test +++ b/scripts/Dockerfile.test @@ -8,7 +8,7 @@ # little different. # -FROM registry.fedoraproject.org/fedora:latest +FROM quay.ceph.io/ceph-ci/ceph:wip-pkalever-rbd-group-snap-mirror ARG GOPATH=/go ARG GOROOT=/usr/local/go diff --git a/vendor/github.com/ceph/go-ceph/rbd/group.go b/vendor/github.com/ceph/go-ceph/rbd/group.go index 654d15e3e33..b36647bd927 100644 --- a/vendor/github.com/ceph/go-ceph/rbd/group.go +++ b/vendor/github.com/ceph/go-ceph/rbd/group.go @@ -110,7 +110,8 @@ func GroupImageAdd(groupIoctx *rados.IOContext, groupName string, cephIoctx(groupIoctx), cGroupName, cephIoctx(imageIoctx), - cImageName) + cImageName, + C.uint32_t(0)) return getError(ret) } @@ -135,7 +136,8 @@ func GroupImageRemove(groupIoctx *rados.IOContext, groupName string, cephIoctx(groupIoctx), cGroupName, cephIoctx(imageIoctx), - cImageName) + cImageName, + C.uint32_t(0)) return getError(ret) } @@ -160,7 +162,8 @@ func GroupImageRemoveByID(groupIoctx *rados.IOContext, groupName string, cephIoctx(groupIoctx), cGroupName, cephIoctx(imageIoctx), - cid) + cid, + C.uint32_t(0)) return getError(ret) } diff --git a/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go b/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go new file mode 100644 index 00000000000..c295597932d --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go @@ -0,0 +1,332 @@ +//go:build ceph_preview + +package rbd + +// #cgo LDFLAGS: -lrbd +// #include +// #include +import "C" +import ( + "fmt" + "unsafe" + + "github.com/ceph/go-ceph/internal/cutil" + "github.com/ceph/go-ceph/rados" +) + +// MirrorGroupEnable will enable mirroring for a group using the specified mode. +// +// Implements: +// +// int rbd_mirror_group_enable(rados_ioctx_t p, const char *name, +// rbd_mirror_image_mode_t mirror_image_mode, +// uint32_t flags); +func MirrorGroupEnable(groupIoctx *rados.IOContext, groupName string, mode ImageMirrorMode) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_enable( + cephIoctx(groupIoctx), + cGroupName, + C.rbd_mirror_image_mode_t(mode), + (C.uint32_t)(2), + ) + return getError(ret) +} + +// MirrorGroupDisable will disabling mirroring for a group +// +// Implements: +// +// int rbd_mirror_group_disable(rados_ioctx_t p, const char *name, +// bool force) +func MirrorGroupDisable(groupIoctx *rados.IOContext, groupName string, force bool) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_disable( + cephIoctx(groupIoctx), + cGroupName, + C.bool(force)) + return getError(ret) +} + +// MirrorGroupPromote will promote the mirrored group to primary status +// +// Implements: +// +// int rbd_mirror_group_promote(rados_ioctx_t p, const char *name, +// uint32_t flags, bool force) +func MirrorGroupPromote(groupIoctx *rados.IOContext, groupName string, force bool) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_promote( + cephIoctx(groupIoctx), + cGroupName, + (C.uint32_t)(0), + C.bool(force)) + return getError(ret) +} + +// MirrorGroupDemote will demote the mirrored group to primary status +// +// Implements: +// +// int rbd_mirror_group_demote(rados_ioctx_t p, const char *name, +// uint32_t flags) +func MirrorGroupDemote(groupIoctx *rados.IOContext, groupName string) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_demote( + cephIoctx(groupIoctx), + cGroupName, + (C.uint32_t)(0)) + return getError(ret) +} + +// MirrorGroupResync is used to manually resolve split-brain status by triggering +// resynchronization +// +// Implements: +// +// int rbd_mirror_group_resync(rados_ioctx_t p, const char *name) +func MirrorGroupResync(groupIoctx *rados.IOContext, groupName string) error { + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + ret := C.rbd_mirror_group_resync( + cephIoctx(groupIoctx), + cGroupName) + return getError(ret) +} + +// MirrorGroupState represents the current state of the mirrored group +type MirrorGroupState C.rbd_mirror_group_state_t + +// String representation of MirrorGroupState. +func (mgs MirrorGroupState) String() string { + switch mgs { + case MirrorGroupEnabled: + return "enabled" + case MirrorGroupDisabled: + return "disabled" + case MirrorGroupEnabling: + return "enabling" + case MirrorGrpupDisabling: + return "disabled" + default: + return "" + } +} + +const ( + // MirrorGrpupDisabling is the representation of + // RBD_MIRROR_GROUP_DISABLING from librbd. + MirrorGrpupDisabling = MirrorGroupState(C.RBD_MIRROR_GROUP_DISABLING) + // MirrorGroupEnabling is the representation of + // RBD_MIRROR_GROUP_ENABLING from librbd + MirrorGroupEnabling = MirrorGroupState(C.RBD_MIRROR_GROUP_ENABLING) + // MirrorGroupEnabled is the representation of + // RBD_MIRROR_IMAGE_ENABLED from librbd. + MirrorGroupEnabled = MirrorGroupState(C.RBD_MIRROR_GROUP_ENABLED) + // MirrorGroupDisabled is the representation of + // RBD_MIRROR_GROUP_DISABLED from librbd. + MirrorGroupDisabled = MirrorGroupState(C.RBD_MIRROR_GROUP_DISABLED) +) + +// MirrorGroupInfo represents the mirroring status information of group. +type MirrorGroupInfo struct { + GlobalID string + State MirrorGroupState + MirrorImageMode ImageMirrorMode + Primary bool +} + +// GetMirrorGroupInfo returns the mirroring status information of the mirrored group +// +// Implements: +// +// int rbd_mirror_group_get_info(rados_ioctx_t p, const char *name, +// rbd_mirror_group_info_t *mirror_group_info, +// size_t info_size) +func GetMirrorGroupInfo(groupIoctx *rados.IOContext, groupName string) (*MirrorGroupInfo, error) { + var cgInfo C.rbd_mirror_group_info_t + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + + ret := C.rbd_mirror_group_get_info( + cephIoctx(groupIoctx), + cGroupName, + &cgInfo, + C.sizeof_rbd_mirror_group_info_t) + + if ret < 0 { + return nil, getError(ret) + } + + info := convertMirrorGroupInfo(&cgInfo) + + // free C memory allocated by C.rbd_mirror_group_get_info call + C.rbd_mirror_group_get_info_cleanup(&cgInfo) + return &info, nil + +} + +func convertMirrorGroupInfo(cgInfo *C.rbd_mirror_group_info_t) MirrorGroupInfo { + return MirrorGroupInfo{ + GlobalID: C.GoString(cgInfo.global_id), + MirrorImageMode: ImageMirrorMode(cgInfo.mirror_image_mode), + State: MirrorGroupState(cgInfo.state), + Primary: bool(cgInfo.primary), + } +} + +// MirrorGroupStatusState is used to indicate the state of a mirrored group +// within the site status info. +type MirrorGroupStatusState int64 + +const ( + // MirrorGroupStatusStateUnknown is equivalent to MIRROR_GROUP_STATUS_STATE_UNKNOWN + MirrorGroupStatusStateUnknown = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_UNKNOWN) + // MirrorGroupStatusStateError is equivalent to MIRROR_GROUP_STATUS_STATE_ERROR + MirrorGroupStatusStateError = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_ERROR) + // MirrorGroupStatusStateStartingReplay is equivalent to MIRROR_GROUP_STATUS_STATE_STARTING_REPLAY + MirrorGroupStatusStateStartingReplay = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_STARTING_REPLAY) + // MirrorGroupStatusStateReplaying is equivalent to MIRROR_GROUP_STATUS_STATE_REPLAYING + MirrorGroupStatusStateReplaying = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_REPLAYING) + // MirrorGroupStatusStateStoppingReplay is equivalent to MIRROR_GROUP_STATUS_STATE_STOPPING_REPLAY + MirrorGroupStatusStateStoppingReplay = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_STOPPING_REPLAY) + // MirrorGroupStatusStateStopped is equivalent to MIRROR_IMAGE_GROUP_STATUS_STATE_STOPPED + MirrorGroupStatusStateStopped = MirrorGroupStatusState(C.MIRROR_GROUP_STATUS_STATE_STOPPED) +) + +// String represents the MirrorImageStatusState as a short string. +func (state MirrorGroupStatusState) String() (s string) { + switch state { + case MirrorGroupStatusStateUnknown: + s = "unknown" + case MirrorGroupStatusStateError: + s = "error" + case MirrorGroupStatusStateStartingReplay: + s = "starting_replay" + case MirrorGroupStatusStateReplaying: + s = "replaying" + case MirrorGroupStatusStateStoppingReplay: + s = "stopping_replay" + case MirrorGroupStatusStateStopped: + s = "stopped" + default: + s = fmt.Sprintf("unknown(%d)", state) + } + return s +} + +// SiteMirrorGroupStatus contains information pertaining to the status of +// a mirrored group within a site. +type SiteMirrorGroupStatus struct { + MirrorUUID string + State MirrorGroupStatusState + MirrorImageCount int + MirrorImagePoolIDs int64 + MirrorImageGlobalIDs string + MirrorImages []SiteMirrorImageStatus + Description string + LastUpdate int64 + Up bool +} + +// GlobalMirrorGroupStatus contains information pertaining to the global +// status of a mirrored group. It contains general information as well +// as per-site information stored in the SiteStatuses slice. +type GlobalMirrorGroupStatus struct { + Name string + Info MirrorGroupInfo + SiteStatusesCount int + SiteStatuses []SiteMirrorGroupStatus +} + +// LocalStatus returns one SiteMirrorGroupStatus item from the SiteStatuses +// slice that corresponds to the local site's status. If the local status +// is not found than the error ErrNotExist will be returned. +func (gmis GlobalMirrorGroupStatus) LocalStatus() (SiteMirrorGroupStatus, error) { + var ( + ss SiteMirrorGroupStatus + err error = ErrNotExist + ) + for i := range gmis.SiteStatuses { + // I couldn't find it explicitly documented, but a site mirror uuid + // of an empty string indicates that this is the local site. + // This pattern occurs in both the pybind code and ceph c++. + if gmis.SiteStatuses[i].MirrorUUID == "" { + ss = gmis.SiteStatuses[i] + err = nil + break + } + } + return ss, err +} + +type groupSiteArray [cutil.MaxIdx]C.rbd_mirror_group_site_status_t + +// GetGlobalMirrorGroupStatus returns status information pertaining to the state +// of a groups's mirroring. +// +// Implements: +// +// int rbd_mirror_group_get_global_status( +// IoCtx& io_ctx, +// const char *group_name +// mirror_group_global_status_t *mirror_group_status, +// size_t status_size); +func GetGlobalMirrorGroupStatus(ioctx *rados.IOContext, groupName string) (GlobalMirrorGroupStatus, error) { + s := C.rbd_mirror_group_global_status_t{} + cGroupName := C.CString(groupName) + defer C.free(unsafe.Pointer(cGroupName)) + // ret := C.rbd_mirror_group_get_global_status( + // cephIoctx(ioctx), + // (*C.char)(cGroupName), + // &s, + // C.sizeof_rbd_mirror_group_global_status_t) + // if err := getError(ret); err != nil { + // return GlobalMirrorGroupStatus{}, err + // } + + status := newGlobalMirrorGroupStatus(&s) + return status, nil +} + +func newGlobalMirrorGroupStatus( + s *C.rbd_mirror_group_global_status_t) GlobalMirrorGroupStatus { + + status := GlobalMirrorGroupStatus{ + Name: C.GoString(s.name), + Info: convertMirrorGroupInfo(&s.info), + SiteStatusesCount: int(s.site_statuses_count), + SiteStatuses: make([]SiteMirrorGroupStatus, s.site_statuses_count), + } + gsscs := (*groupSiteArray)(unsafe.Pointer(s.site_statuses))[:s.site_statuses_count:s.site_statuses_count] + for i := C.uint32_t(0); i < s.site_statuses_count; i++ { + gss := gsscs[i] + status.SiteStatuses[i] = SiteMirrorGroupStatus{ + MirrorUUID: C.GoString(gss.mirror_uuid), + MirrorImageGlobalIDs: C.GoString(*gss.mirror_image_global_ids), + MirrorImagePoolIDs: int64(*gss.mirror_image_pool_ids), + State: MirrorGroupStatusState(gss.state), + Description: C.GoString(gss.description), + MirrorImageCount: int(gss.mirror_image_count), + LastUpdate: int64(gss.last_update), + MirrorImages: make([]SiteMirrorImageStatus, gss.mirror_image_count), + Up: bool(gss.up), + } + + sscs := (*siteArray)(unsafe.Pointer(gss.mirror_images))[:gss.mirror_image_count:gss.mirror_image_count] + for i := C.uint32_t(0); i < gss.mirror_image_count; i++ { + ss := sscs[i] + status.SiteStatuses[i].MirrorImages[i] = SiteMirrorImageStatus{ + MirrorUUID: C.GoString(ss.mirror_uuid), + State: MirrorImageStatusState(ss.state), + Description: C.GoString(ss.description), + LastUpdate: int64(ss.last_update), + Up: bool(ss.up), + } + } + } + return status +} From 0dad1762964fe89ee4cbfe98e76c66924839d29e Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Thu, 21 Nov 2024 13:38:49 +0100 Subject: [PATCH 4/6] ci: extra commit will be dropped This is extra commit will be dropped Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 6 +- internal/rbd/group/group_mirror.go | 55 ++++++++- .../ceph/go-ceph/rbd/mirror_group.go | 115 ++++++++++++------ 3 files changed, 137 insertions(+), 39 deletions(-) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index bd9151d8e61..61a8a321449 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -876,6 +876,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } + log.DebugLog(ctx, "mirrorStatus: %+v", mirrorStatus) remoteStatus, err := mirrorStatus.GetRemoteSiteStatus(ctx) if err != nil { log.ErrorLog(ctx, "failed to get remote site status for mirror %q: %v", mirror, err) @@ -899,6 +900,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Errorf(codes.Internal, "failed to get last sync info: %v", err) } + log.DebugLog(ctx, "Madhu the response is %v", resp) return resp, nil } @@ -923,7 +925,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV if description == "" { return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound) } - log.DebugLog(ctx, "description: %s", description) + log.DebugLog(ctx, "Madhu description: %s", description) splittedString := strings.SplitN(description, ",", 2) if len(splittedString) == 1 { return nil, fmt.Errorf("no snapshot details: %w", corerbd.ErrLastSyncTimeNotFound) @@ -940,6 +942,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV return nil, fmt.Errorf("failed to unmarshal local snapshot info: %w", err) } + log.DebugLog(ctx, "Madhu the description after unmarshalling is %v", localSnapInfo) // If the json unmarsal is successful but the local snapshot time is 0, we // need to consider it as an error as the LastSyncTime is required. if localSnapInfo.LocalSnapshotTime == 0 { @@ -965,6 +968,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV response.LastSyncTime = lastSyncTime response.LastSyncBytes = localSnapInfo.LastSnapshotBytes + log.DebugLog(ctx, "Madhu the return response is %v", response) return &response, nil } diff --git a/internal/rbd/group/group_mirror.go b/internal/rbd/group/group_mirror.go index 5d68597e756..f4a3a61cf9d 100644 --- a/internal/rbd/group/group_mirror.go +++ b/internal/rbd/group/group_mirror.go @@ -15,8 +15,10 @@ package group import ( "context" + "encoding/json" "errors" "fmt" + "strings" "time" "github.com/ceph/go-ceph/rados" @@ -279,6 +281,12 @@ func (status globalMirrorGroupStatus) GetRemoteSiteStatus(ctx context.Context) ( err error = librbd.ErrNotExist ) + type localStatus struct { + LocalSnapshotTime int64 `json:"local_snapshot_timestamp"` + LastSnapshotBytes int64 `json:"last_snapshot_bytes"` + LastSnapshotDuration *int64 `json:"last_snapshot_sync_seconds"` + } + for i := range status.SiteStatuses { log.DebugLog( ctx, @@ -291,8 +299,53 @@ func (status globalMirrorGroupStatus) GetRemoteSiteStatus(ctx context.Context) ( if status.SiteStatuses[i].MirrorUUID != "" { ss = status.SiteStatuses[i] - err = nil + images := status.SiteStatuses[i].MirrorImages + + totalSnpshotTime := int64(0) + totalSnapshotBytes := int64(0) + totalSnapshotDuration := int64(0) + totalImages := len(images) + for _, image := range images { + log.DebugLog(ctx, "Madhu image: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + image.MirrorUUID, + image.State, + image.Description, + image.LastUpdate, + image.Up) + description := image.Description + log.DebugLog(ctx, "[Madhu] description: %s", description) + splittedString := strings.SplitN(description, ",", 2) + if len(splittedString) == 1 { + log.DebugLog(ctx, "no snapshot details", splittedString[0]) + continue + } + var localSnapInfo localStatus + err := json.Unmarshal([]byte(splittedString[1]), &localSnapInfo) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal local snapshot info: %w", err) + } + log.DebugLog(ctx, "Madhu localStatus", localSnapInfo) + totalSnpshotTime += localSnapInfo.LocalSnapshotTime + totalSnapshotBytes += localSnapInfo.LastSnapshotBytes + totalSnapshotDuration += *localSnapInfo.LastSnapshotDuration + } + err = nil + totalDuration := int64(totalSnapshotDuration / int64(totalImages)) + // write the total snapshot time, bytes and duration to the description + d := localStatus{ + LocalSnapshotTime: int64(totalSnpshotTime / int64(totalImages)), + LastSnapshotBytes: int64(totalSnapshotBytes / int64(totalImages)), + LastSnapshotDuration: &totalDuration, + } + description, err := json.Marshal(d) + log.DebugLog(ctx, "description: %s", description) + log.DebugLog(ctx, "description: %v", d) + if err != nil { + + return nil, fmt.Errorf("failed to marshal local snapshot info: %w", err) + } + ss.Description = fmt.Sprintf("%s, %s", ss.Description, description) break } } diff --git a/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go b/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go index c295597932d..a33fd6ee273 100644 --- a/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go +++ b/vendor/github.com/ceph/go-ceph/rbd/mirror_group.go @@ -7,6 +7,7 @@ package rbd // #include import "C" import ( + "errors" "fmt" "unsafe" @@ -156,11 +157,19 @@ func GetMirrorGroupInfo(groupIoctx *rados.IOContext, groupName string) (*MirrorG cGroupName, &cgInfo, C.sizeof_rbd_mirror_group_info_t) - + var err error if ret < 0 { - return nil, getError(ret) + err = getError(ret) + } + if err != nil { + if errors.Is(err, ErrNotFound) || errors.Is(err, rados.RadosErrorNotFound) { + return &MirrorGroupInfo{ + State: MirrorGroupDisabled, + MirrorImageMode: ImageMirrorModeSnapshot, + Primary: false, + }, nil + } } - info := convertMirrorGroupInfo(&cgInfo) // free C memory allocated by C.rbd_mirror_group_get_info call @@ -279,54 +288,86 @@ func GetGlobalMirrorGroupStatus(ioctx *rados.IOContext, groupName string) (Globa s := C.rbd_mirror_group_global_status_t{} cGroupName := C.CString(groupName) defer C.free(unsafe.Pointer(cGroupName)) - // ret := C.rbd_mirror_group_get_global_status( - // cephIoctx(ioctx), - // (*C.char)(cGroupName), - // &s, - // C.sizeof_rbd_mirror_group_global_status_t) - // if err := getError(ret); err != nil { - // return GlobalMirrorGroupStatus{}, err - // } + ret := C.rbd_mirror_group_get_global_status( + cephIoctx(ioctx), + (*C.char)(cGroupName), + &s, + C.sizeof_rbd_mirror_group_global_status_t) + if err := getError(ret); err != nil { + return GlobalMirrorGroupStatus{}, err + } status := newGlobalMirrorGroupStatus(&s) return status, nil } -func newGlobalMirrorGroupStatus( - s *C.rbd_mirror_group_global_status_t) GlobalMirrorGroupStatus { - +func newGlobalMirrorGroupStatus(s *C.rbd_mirror_group_global_status_t) GlobalMirrorGroupStatus { + // Initializing the status object status := GlobalMirrorGroupStatus{ Name: C.GoString(s.name), Info: convertMirrorGroupInfo(&s.info), SiteStatusesCount: int(s.site_statuses_count), SiteStatuses: make([]SiteMirrorGroupStatus, s.site_statuses_count), } - gsscs := (*groupSiteArray)(unsafe.Pointer(s.site_statuses))[:s.site_statuses_count:s.site_statuses_count] - for i := C.uint32_t(0); i < s.site_statuses_count; i++ { - gss := gsscs[i] - status.SiteStatuses[i] = SiteMirrorGroupStatus{ - MirrorUUID: C.GoString(gss.mirror_uuid), - MirrorImageGlobalIDs: C.GoString(*gss.mirror_image_global_ids), - MirrorImagePoolIDs: int64(*gss.mirror_image_pool_ids), - State: MirrorGroupStatusState(gss.state), - Description: C.GoString(gss.description), - MirrorImageCount: int(gss.mirror_image_count), - LastUpdate: int64(gss.last_update), - MirrorImages: make([]SiteMirrorImageStatus, gss.mirror_image_count), - Up: bool(gss.up), - } - sscs := (*siteArray)(unsafe.Pointer(gss.mirror_images))[:gss.mirror_image_count:gss.mirror_image_count] - for i := C.uint32_t(0); i < gss.mirror_image_count; i++ { - ss := sscs[i] - status.SiteStatuses[i].MirrorImages[i] = SiteMirrorImageStatus{ - MirrorUUID: C.GoString(ss.mirror_uuid), - State: MirrorImageStatusState(ss.state), - Description: C.GoString(ss.description), - LastUpdate: int64(ss.last_update), - Up: bool(ss.up), + // Print the count of site statuses for debugging + fmt.Println("status.SiteStatusesCount: ", s.site_statuses_count) + + fmt.Printf("s.site_statuses: %+v\n", s.site_statuses) + // Check if site statuses are not null before using them + if s.site_statuses != nil && s.site_statuses_count > 0 { + gsscs := (*groupSiteArray)(unsafe.Pointer(s.site_statuses))[:s.site_statuses_count:s.site_statuses_count] + for i := C.uint32_t(0); i < s.site_statuses_count; i++ { + gss := gsscs[i] + // Ensure that fields are valid before using them + if gss.mirror_uuid != nil && gss.mirror_image_global_ids != nil { + status.SiteStatuses[i] = SiteMirrorGroupStatus{ + MirrorUUID: C.GoString(gss.mirror_uuid), + MirrorImageGlobalIDs: C.GoString(*gss.mirror_image_global_ids), + MirrorImagePoolIDs: int64(*gss.mirror_image_pool_ids), + State: MirrorGroupStatusState(gss.state), + Description: C.GoString(gss.description), + MirrorImageCount: int(gss.mirror_image_count), + LastUpdate: int64(gss.last_update), + MirrorImages: make([]SiteMirrorImageStatus, gss.mirror_image_count), + Up: bool(gss.up), + } + + // Check if the mirror_images pointer is valid + if gss.mirror_images != nil && gss.mirror_image_count > 0 { + + sscs := (*siteArray)(unsafe.Pointer(gss.mirror_images))[:gss.mirror_image_count:gss.mirror_image_count] + fmt.Printf("sscs: siteArray %+v\n", sscs) + for j := C.uint32_t(0); j < gss.mirror_image_count; j++ { + ss := sscs[j] + + // Ensure that fields are valid before using them + if ss.mirror_uuid != nil { + status.SiteStatuses[i].MirrorImages[j] = SiteMirrorImageStatus{ + MirrorUUID: C.GoString(ss.mirror_uuid), + State: MirrorImageStatusState(ss.state), + Description: C.GoString(ss.description), + LastUpdate: int64(ss.last_update), + Up: bool(ss.up), + } + } else { + // Log if a field is invalid + fmt.Println("Warning: mirror_uuid is nil at index", i, j) + } + } + } else { + // Handle case where mirror_images is nil or mirror_image_count is 0 + fmt.Println("Warning: mirror_images is nil or mirror_image_count is 0 at index", i) + } + } else { + // Handle case where mirror_uuid or mirror_image_global_ids is nil + fmt.Println("Warning: mirror_uuid or mirror_image_global_ids is nil at index", i) } } + } else { + // Handle case where site statuses are nil or count is 0 + fmt.Println("Warning: site_statuses is nil or site_statuses_count is 0") } + // Return the populated status return status } From 22966e8a1c40ed7ca1bb3c981b4abd5cac049ac9 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 13 Jan 2025 15:16:06 +0100 Subject: [PATCH 5/6] ci: extra commit to fix panic This commit will be removed later Signed-off-by: Madhu Rajanna --- internal/rbd/group/group_mirror.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/internal/rbd/group/group_mirror.go b/internal/rbd/group/group_mirror.go index f4a3a61cf9d..751fd4d67ca 100644 --- a/internal/rbd/group/group_mirror.go +++ b/internal/rbd/group/group_mirror.go @@ -331,11 +331,22 @@ func (status globalMirrorGroupStatus) GetRemoteSiteStatus(ctx context.Context) ( totalSnapshotDuration += *localSnapInfo.LastSnapshotDuration } err = nil - totalDuration := int64(totalSnapshotDuration / int64(totalImages)) + totalDuration := int64(0) + if totalSnapshotDuration > 0 { + totalDuration = int64(totalSnapshotDuration / int64(totalImages)) + } + totalTime := int64(0) + if totalSnpshotTime > 0 { + totalTime = int64(totalSnpshotTime / int64(totalImages)) + } + totalBytes := int64(0) + if totalSnapshotBytes > 0 { + totalBytes = int64(totalSnapshotBytes / int64(totalImages)) + } // write the total snapshot time, bytes and duration to the description d := localStatus{ - LocalSnapshotTime: int64(totalSnpshotTime / int64(totalImages)), - LastSnapshotBytes: int64(totalSnapshotBytes / int64(totalImages)), + LocalSnapshotTime: totalTime, + LastSnapshotBytes: totalBytes, LastSnapshotDuration: &totalDuration, } description, err := json.Marshal(d) From 2b965588429c569b13db0300bc4fe11e7a171798 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 20 Jan 2025 10:01:20 +0100 Subject: [PATCH 6/6] rbd: delete group only if its primary This is a work of Nikhil which need to be applied on top of this PR to test the feature. Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/volumegroup.go | 62 +++++++++++++++++++------- internal/rbd/group/volume_group.go | 57 +++++++++++++++++++---- internal/rbd/group_controllerserver.go | 4 +- internal/rbd/types/group.go | 4 +- 4 files changed, 99 insertions(+), 28 deletions(-) diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index dd349ac8af0..e373132b60c 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -27,6 +27,7 @@ import ( "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util/log" + "github.com/csi-addons/spec/lib/go/replication" "github.com/csi-addons/spec/lib/go/volumegroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -206,27 +207,41 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup( log.DebugLog(ctx, "VolumeGroup %q has been found", req.GetVolumeGroupId()) - // verify that the volume group is empty - volumes, err := vg.ListVolumes(ctx) + volumes, mirror, err := mgr.GetMirrorSource(ctx, req.GetVolumeGroupId(), &replication.ReplicationSource{ + Type: &replication.ReplicationSource_Volumegroup{ + Volumegroup: &replication.ReplicationSource_VolumeGroupSource{ + VolumeGroupId: req.GetVolumeGroupId(), + }, + }, + }) if err != nil { - return nil, status.Errorf( - codes.NotFound, - "could not list volumes for voluem group %q: %s", - req.GetVolumeGroupId(), - err.Error()) + return nil, getGRPCError(err) } + defer destoryVolumes(ctx, volumes) - log.DebugLog(ctx, "VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes)) + vgrMirrorInfo, err := mirror.GetMirroringInfo(ctx) - if len(volumes) != 0 { - return nil, status.Errorf( - codes.FailedPrecondition, - "rejecting to delete non-empty volume group %q", - req.GetVolumeGroupId()) + // verify that the volume group is empty, if the group is primary + if vgrMirrorInfo.IsPrimary() { + volumes, err := vg.ListVolumes(ctx) + if err != nil { + return nil, status.Errorf( + codes.NotFound, + "could not list volumes for volume group %q: %s", + req.GetVolumeGroupId(), + err.Error()) + } + log.DebugLog(ctx, "VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes)) + if len(volumes) != 0 { + return nil, status.Errorf( + codes.FailedPrecondition, + "rejecting to delete non-empty volume group %q", + req.GetVolumeGroupId()) + } } // delete the volume group - err = vg.Delete(ctx) + err = vg.Delete(ctx, vgrMirrorInfo, mirror) if err != nil { return nil, status.Errorf(codes.Internal, "failed to delete volume group %q: %s", @@ -336,10 +351,27 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership( } } + vol, mirror, err := mgr.GetMirrorSource(ctx, req.GetVolumeGroupId(), &replication.ReplicationSource{ + Type: &replication.ReplicationSource_Volumegroup{ + Volumegroup: &replication.ReplicationSource_VolumeGroupSource{ + VolumeGroupId: req.GetVolumeGroupId(), + }, + }, + }) + if err != nil { + return nil, getGRPCError(err) + } + defer destoryVolumes(ctx, vol) + + vgrMirrorInfo, err := mirror.GetMirroringInfo(ctx) + + // Skip removing images from group if the group is secondary + removeImageFromGroup := vgrMirrorInfo.IsPrimary() + // remove the volume that should not be part of the group for _, id := range toRemove { vol := beforeIDs[id] - err = vg.RemoveVolume(ctx, vol) + err = vg.RemoveVolume(ctx, vol, removeImageFromGroup) if err != nil { return nil, status.Errorf( codes.Internal, diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index fa9159bdd8d..55cd78201d8 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -26,6 +26,8 @@ import ( librbd "github.com/ceph/go-ceph/rbd" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/csi-addons/spec/lib/go/volumegroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" @@ -184,7 +186,7 @@ func (vg *volumeGroup) Create(ctx context.Context) error { return nil } -func (vg *volumeGroup) Delete(ctx context.Context) error { +func (vg *volumeGroup) Delete(ctx context.Context, vgrMirrorInfo types.MirrorInfo, mirror types.Mirror) error { name, err := vg.GetName(ctx) if err != nil { return err @@ -195,6 +197,41 @@ func (vg *volumeGroup) Delete(ctx context.Context) error { return err } + // Cleanup only omap data if the following condition is met + // Mirroring is enabled on the group + // Local group is secondary + // Local group is in up+replaying state + log.DebugLog(ctx, "volume group %v is in %v state and is primary %v", vg, vgrMirrorInfo.GetState, vgrMirrorInfo.IsPrimary()) + if vgrMirrorInfo != nil && vgrMirrorInfo.GetState() == librbd.MirrorGroupEnabled.String() && !vgrMirrorInfo.IsPrimary() { + // If the group is in a secondary state and its up+replaying means its + // an healthy secondary and the group is primary somewhere in the + // remote cluster and the local group is getting replayed. Delete the + // OMAP data generated as we cannot delete the secondary group. When + // the group on the primary cluster gets deleted/mirroring disabled, + // the group on all the remote (secondary) clusters will get + // auto-deleted. This helps in garbage collecting the OMAP, VR, VGR, + // VGRC, PVC and PV objects after failback operation. + if mirror != nil { + sts, rErr := mirror.GetGlobalMirroringStatus(ctx) + if rErr != nil { + return status.Error(codes.Internal, rErr.Error()) + } + localStatus, rErr := sts.GetLocalSiteStatus() + if rErr != nil { + log.ErrorLog(ctx, "failed to get local status for volume group%s: %w", name, rErr) + return status.Error(codes.Internal, rErr.Error()) + } + log.DebugLog(ctx, "local status is %v and local state is %v", localStatus.IsUP(), localStatus.GetState()) + if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorGroupStatusStateReplaying.String() { + return vg.commonVolumeGroup.Delete(ctx) + } + log.ErrorLog(ctx, + "secondary group status is up=%t and state=%s", + localStatus.IsUP(), + localStatus.GetState()) + } + } + err = librbd.GroupRemove(ioctx, name) if err != nil && !errors.Is(err, rados.ErrNotFound) { return fmt.Errorf("failed to remove volume group %q: %w", vg, err) @@ -252,21 +289,23 @@ func (vg *volumeGroup) AddVolume(ctx context.Context, vol types.Volume) error { return nil } -func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error { +func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume, removeImageFromGroup bool) error { // volume was already removed from the group if len(vg.volumes) == 0 { return nil } - err := vol.RemoveFromGroup(ctx, vg) - if err != nil { - if errors.Is(err, librbd.ErrNotExist) { - return nil - } + if removeImageFromGroup { + log.DebugLog(ctx, "removing image %v from group %v", vol, vg) + err := vol.RemoveFromGroup(ctx, vg) + if err != nil { + if errors.Is(err, librbd.ErrNotExist) { + return nil + } - return fmt.Errorf("failed to remove volume %q from volume group %q: %w", vol, vg, err) + return fmt.Errorf("failed to remove volume %q from volume group %q: %w", vol, vg, err) + } } - // toRemove contain the ID of the volume that is removed from the group toRemove, err := vol.GetID(ctx) if err != nil { diff --git a/internal/rbd/group_controllerserver.go b/internal/rbd/group_controllerserver.go index a4b313ea7cd..c8f2e9181b6 100644 --- a/internal/rbd/group_controllerserver.go +++ b/internal/rbd/group_controllerserver.go @@ -72,7 +72,7 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( for _, volume := range volumes { if vg != nil { // 'normal' cleanup, remove all images from the group - vgErr := vg.RemoveVolume(ctx, volume) + vgErr := vg.RemoveVolume(ctx, volume, true) if vgErr != nil { log.ErrorLog( ctx, @@ -91,7 +91,7 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( // the VG should always be deleted, volumes can only belong to a single VG log.DebugLog(ctx, "removing temporary volume group %q", vg) - vgErr := vg.Delete(ctx) + vgErr := vg.Delete(ctx, nil, nil) if vgErr != nil { log.ErrorLog(ctx, "failed to remove temporary volume group %q: %v", vg, vgErr) } diff --git a/internal/rbd/types/group.go b/internal/rbd/types/group.go index f9b9b8f3782..c83f951d4e4 100644 --- a/internal/rbd/types/group.go +++ b/internal/rbd/types/group.go @@ -58,13 +58,13 @@ type VolumeGroup interface { Create(ctx context.Context) error // Delete removes the VolumeGroup from the backend storage. - Delete(ctx context.Context) error + Delete(ctx context.Context, vgMirrorInfo MirrorInfo, mirror Mirror) error // AddVolume adds the Volume to the VolumeGroup. AddVolume(ctx context.Context, volume Volume) error // RemoveVolume removes the Volume from the VolumeGroup. - RemoveVolume(ctx context.Context, volume Volume) error + RemoveVolume(ctx context.Context, volume Volume, removeImageFromGroup bool) error // ListVolumes returns a slice with all Volumes in the VolumeGroup. ListVolumes(ctx context.Context) ([]Volume, error)