From ed5b2506b3cff906601d3d151c666c50487e9f73 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 29 Jul 2024 13:15:52 +0200 Subject: [PATCH] rbd: implement volume group using go-ceph This adds the required functionality to call the go-ceph API's for the volume group. Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 12 +- internal/rbd/group/volume_group.go | 297 ++++++++++++++++++++++++- internal/rbd/types/group.go | 3 + 3 files changed, 306 insertions(+), 6 deletions(-) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 03f40425b5b9..4bbfec2dd968 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -29,6 +29,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd" + rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -818,11 +819,12 @@ func getGRPCError(err error) error { } errorStatusMap := map[error]codes.Code{ - corerbd.ErrInvalidArgument: codes.InvalidArgument, - corerbd.ErrFlattenInProgress: codes.Aborted, - corerbd.ErrAborted: codes.Aborted, - corerbd.ErrFailedPrecondition: codes.FailedPrecondition, - corerbd.ErrUnavailable: codes.Unavailable, + corerbd.ErrInvalidArgument: codes.InvalidArgument, + corerbd.ErrFlattenInProgress: codes.Aborted, + corerbd.ErrAborted: codes.Aborted, + corerbd.ErrFailedPrecondition: codes.FailedPrecondition, + corerbd.ErrUnavailable: codes.Unavailable, + rbd_group.ErrRBDGroupUnAvailable: codes.Unavailable, } for e, code := range errorStatusMap { diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index 6e6e59019420..9151971e1461 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -21,9 +21,11 @@ import ( "errors" "fmt" "strings" + "time" "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/csi-addons/spec/lib/go/volumegroup" @@ -33,7 +35,10 @@ import ( "github.com/ceph/ceph-csi/internal/util/log" ) -var ErrRBDGroupNotConnected = errors.New("RBD group is not connected") +var ( + ErrRBDGroupNotConnected = errors.New("RBD group is not connected") + ErrRBDGroupUnAvailable = errors.New("RBD group is unavailable") +) // volumeGroup handles all requests for 'rbd group' operations. type volumeGroup struct { @@ -465,3 +470,293 @@ func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error) { return vg.volumes, nil } + +//nolint:gocritic // TODO: remove this comment +func (vg *volumeGroup) EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error { + /*name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupEnable(ioctx, name, mode) + if err != nil { + return fmt.Errorf("failed to enable mirroring on volume group %q: %w", vg, err) + }*/ + + log.DebugLog(ctx, "mirroring is enabled on the volume group %q", vg) + + return nil +} + +//nolint:gocritic // TODO: remove this comment +func (vg *volumeGroup) DisableMirroring(ctx context.Context, force bool) error { + /*name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupDisable(ioctx, name, force) + if err != nil && !errors.Is(rados.ErrNotFound, err) { + return fmt.Errorf("failed to disable mirroring on volume group %q: %w", vg, err) + }*/ + + log.DebugLog(ctx, "mirroring is disabled on the volume group %q", vg) + + return nil +} + +//nolint:gocritic // TODO: remove this comment +func (vg *volumeGroup) Promote(ctx context.Context, force bool) error { + /*name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupPromote(ioctx, name, force) + if err != nil { + return fmt.Errorf("failed to promote volume group %q: %w", vg, err) + }*/ + + log.DebugLog(ctx, "volume group %q has been promoted", vg) + + return nil +} + +func (vg *volumeGroup) ForcePromote(ctx context.Context, cr *util.Credentials) error { + promoteArgs := []string{ + "mirror", "group", "promote", + vg.String(), + "--force", + "--id", cr.ID, + "-m", vg.monitors, + "--keyfile=" + cr.KeyFile, + } + _, stderr, err := util.ExecCommandWithTimeout( + ctx, + // 2 minutes timeout as the Replication RPC timeout is 2.5 minutes. + 2*time.Minute, + "rbd", + promoteArgs..., + ) + if err != nil { + return fmt.Errorf("failed to promote group %q with error: %w", vg, err) + } + + if stderr != "" { + return fmt.Errorf("failed to promote group %q with stderror: %s", vg, stderr) + } + + return nil +} + +//nolint:gocritic // TODO: remove this comment +func (vg *volumeGroup) Demote(ctx context.Context) error { + /*name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupDemote(ioctx, name) + if err != nil { + return fmt.Errorf("failed to demote volume group %q: %w", vg, err) + }*/ + + log.DebugLog(ctx, "volume group %q has been demoted", vg) + + return nil +} + +//nolint:gocritic // TODO: remove this comment +func (vg *volumeGroup) Resync(ctx context.Context) error { + /*name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupResync(ioctx, name) + if err != nil { + return fmt.Errorf("failed to resync volume group %q: %w", vg, err) + }*/ + + log.DebugLog(ctx, "issued resync on volume group %q", vg) + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrRBDGroupUnAvailable) +} + +//nolint:gocritic // TODO: remove this comment +func (vg *volumeGroup) GetMirroringInfo(ctx context.Context) (types.MirrorInfo, error) { + /*name, err := vg.GetName(ctx) + if err != nil { + return nil, err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return nil, err + } + + info, err := librbd.GetMirrorGroupInfo(ioctx, name) + if err != nil { + return nil, fmt.Errorf("failed to resync volume group %q: %w", vg, err) + } + return GroupStatus{MirrorGroupInfo: info}, nil + */ + return nil, nil +} + +func (vg *volumeGroup) GetGlobalMirroringStatus(ctx context.Context) (types.GlobalStatus, error) { + return nil, nil +} + +func (vg *volumeGroup) AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error { + ls := admin.NewLevelSpec(vg.pool, vg.namespace, "") + ra, err := vg.conn.GetRBDAdmin() + if err != nil { + return err + } + adminConn := ra.MirrorSnashotSchedule() + err = adminConn.Add(ls, interval, startTime) + if err != nil { + return err + } + + return nil +} + +func (vg *volumeGroup) ToMirror() (types.Mirror, error) { + return vg, nil +} + +// GroupStatus is a wrapper around librbd.MirrorGroupInfo that contains the +// group mirror status. +type GroupStatus struct { + // *librbd.MirrorGroupInfo //nolint:gocritic // TODO: remove this comment +} + +func (status GroupStatus) GetState() string { + // return status.State.String() //nolint:gocritic // TODO: remove this comment + return "" +} + +func (status GroupStatus) IsPrimary() bool { + // return status.Primary //nolint:gocritic // TODO: remove this comment + return false +} + +// GlobalGroupMirrorStatus is a wrapper around librbd.GlobalGroupMirrorImageStatus that contains the +// global mirror group status. +type GlobalGroupMirrorStatus struct { + librbd.GlobalMirrorImageStatus +} + +func (status GlobalGroupMirrorStatus) GetState() string { + return status.GlobalMirrorImageStatus.Info.State.String() +} + +func (status GlobalGroupMirrorStatus) IsPrimary() bool { + return status.GlobalMirrorImageStatus.Info.Primary +} + +func (status GlobalGroupMirrorStatus) GetLocalSiteStatus() (types.SiteStatus, error) { + s, err := status.GlobalMirrorImageStatus.LocalStatus() + if err != nil { + err = fmt.Errorf("failed to get local site status: %w", err) + } + + return SiteMirrorImageStatus{ + SiteMirrorImageStatus: s, + }, err +} + +func (status GlobalGroupMirrorStatus) GetAllSitesStatus() []types.SiteStatus { + var siteStatuses []types.SiteStatus + for _, ss := range status.SiteStatuses { + siteStatuses = append(siteStatuses, SiteMirrorImageStatus{SiteMirrorImageStatus: ss}) + } + + return siteStatuses +} + +// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses +// slice that corresponds to the remote site's status. If the remote status +// is not found than the error ErrNotExist will be returned. +func (status GlobalGroupMirrorStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) { + var ( + ss librbd.SiteMirrorImageStatus + err error = librbd.ErrNotExist + ) + + for i := range status.SiteStatuses { + log.DebugLog( + ctx, + "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + status.SiteStatuses[i].MirrorUUID, + status.SiteStatuses[i].State, + status.SiteStatuses[i].Description, + status.SiteStatuses[i].LastUpdate, + status.SiteStatuses[i].Up) + + if status.SiteStatuses[i].MirrorUUID != "" { + ss = status.SiteStatuses[i] + err = nil + + break + } + } + + return SiteMirrorImageStatus{SiteMirrorImageStatus: ss}, err +} + +// SiteMirrorImageStatus is a wrapper around librbd.SiteMirrorImageStatus that contains the +// site mirror image status. +type SiteMirrorImageStatus struct { + librbd.SiteMirrorImageStatus +} + +func (status SiteMirrorImageStatus) GetMirrorUUID() string { + return status.MirrorUUID +} + +func (status SiteMirrorImageStatus) GetState() string { + return status.State.String() +} + +func (status SiteMirrorImageStatus) GetDescription() string { + return status.Description +} + +func (status SiteMirrorImageStatus) IsUP() bool { + return status.Up +} + +func (status SiteMirrorImageStatus) GetLastUpdate() time.Time { + // convert the last update time to UTC + return time.Unix(status.LastUpdate, 0).UTC() +} diff --git a/internal/rbd/types/group.go b/internal/rbd/types/group.go index 36f89e807f5a..018455f1575f 100644 --- a/internal/rbd/types/group.go +++ b/internal/rbd/types/group.go @@ -66,4 +66,7 @@ type VolumeGroup interface { // ListVolumes returns a slice with all Volumes in the VolumeGroup. ListVolumes(ctx context.Context) ([]Volume, error) + + // ToMirror converts the VolumeGroup to a Mirror. + ToMirror() (Mirror, error) }