Skip to content

Commit

Permalink
rbd: update replication to work with group
Browse files Browse the repository at this point in the history
made required changes to the replication
and manager interface to work with the
rbd volume group.

Signed-off-by: Madhu Rajanna <[email protected]>
  • Loading branch information
Madhu-1 committed Jul 31, 2024
1 parent 074d954 commit 639ca8f
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 111 deletions.
186 changes: 91 additions & 95 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,12 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, volumeID, req.GetReplicationSource())
if err != nil {
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

defer mirror.Destroy(ctx)
defer destoryVolumes(ctx, volumes)
// extract the mirroring mode
mirroringMode, err := getMirroringMode(ctx, req.GetParameters())
if err != nil {
Expand All @@ -307,11 +304,13 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}
if info.GetState() != librbd.MirrorImageEnabled.String() {
err = rbdVol.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
for _, rbdVol := range volumes {
err = rbdVol.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err)

return nil, getGRPCError(err)
return nil, getGRPCError(err)
}
}
err = mirror.EnableMirroring(ctx, mirroringMode)
if err != nil {
Expand Down Expand Up @@ -350,14 +349,12 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, volumeID, req.GetReplicationSource())
if err != nil {
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer mirror.Destroy(ctx)
defer destoryVolumes(ctx, volumes)

// extract the force option
force, err := getForceOption(ctx, req.GetParameters())
Expand Down Expand Up @@ -418,14 +415,12 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, volumeID, req.GetReplicationSource())
if err != nil {
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer mirror.Destroy(ctx)
defer destoryVolumes(ctx, volumes)

info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -475,10 +470,10 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
}
log.DebugLog(
ctx,
"Added scheduling at interval %s, start time %s for volume %s",
"Added scheduling at interval %s, start time %s for volumeId %s",
interval,
startTime,
rbdVol)
volumeID)
}

return &replication.PromoteVolumeResponse{}, nil
Expand Down Expand Up @@ -511,21 +506,12 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, volumeID, req.GetReplicationSource())
if err != nil {
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

creationTime, err := rbdVol.GetCreationTime()
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Internal, err.Error())
}
defer mirror.Destroy(ctx)
defer destoryVolumes(ctx, volumes)

info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
Expand All @@ -544,18 +530,27 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,

// demote image to secondary
if info.IsPrimary() {
// store the image creation time for resync
_, err = rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, rbdVol)
err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime))
}
if err != nil {
log.ErrorLog(ctx, err.Error())
for _, vol := range volumes {
// store the image creation time for resync
creationTime, cErr := vol.GetCreationTime()
if cErr != nil {
log.ErrorLog(ctx, cErr.Error())

return nil, status.Error(codes.Internal, err.Error())
}
return nil, status.Error(codes.Internal, cErr.Error())
}

// store the image creation time for resync
_, err = vol.GetMetadata(imageCreationTimeKey)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, vol)
err = vol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime))
}
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Internal, err.Error())
}
}
err = mirror.Demote(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand Down Expand Up @@ -621,14 +616,12 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, volumeID, req.GetReplicationSource())
if err != nil {
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer mirror.Destroy(ctx)
defer destoryVolumes(ctx, volumes)

info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -693,49 +686,56 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus())
}

creationTime, err := rbdVol.GetCreationTime()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error())
}

// image creation time is stored in the image metadata. it looks like
// `"seconds:1692879841 nanos:631526669"`
// If the image gets resynced the local image creation time will be
// lost, if the keys is not present in the image metadata then we can
// assume that the image is already resynced.
savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil && !errors.Is(err, librbd.ErrNotFound) {
return nil, status.Errorf(codes.Internal,
"failed to get %s key from image metadata for %s: %s",
imageCreationTimeKey,
rbdVol,
err.Error())
}
for _, vol := range volumes {
creationTime, tErr := vol.GetCreationTime()
if tErr != nil {
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", vol, tErr.Error())
}

if savedImageTime != "" {
st, sErr := timestampFromString(savedImageTime)
if sErr != nil {
return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error())
// image creation time is stored in the image metadata. it looks like
// `"seconds:1692879841 nanos:631526669"`
// If the image gets resynced the local image creation time will be
// lost, if the keys is not present in the image metadata then we can
// assume that the image is already resynced.
var savedImageTime string
savedImageTime, err = vol.GetMetadata(imageCreationTimeKey)
if err != nil && !errors.Is(err, librbd.ErrNotFound) {
return nil, status.Errorf(codes.Internal,
"failed to get %s key from image metadata for %s: %s",
imageCreationTimeKey,
vol,
err.Error())
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())
if req.GetForce() && st.Equal(creationTime.AsTime()) {
err = mirror.Resync(ctx)
if err != nil {
return nil, getGRPCError(err)

if savedImageTime != "" {
st, sErr := timestampFromString(savedImageTime)
if sErr != nil {
return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error())
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", vol, st, creationTime.AsTime())
if req.GetForce() && st.Equal(creationTime.AsTime()) {
err = mirror.Resync(ctx)
if err != nil {
return nil, getGRPCError(err)
}
// Break the loop as we need to issue resync only once for the image or for the group.
break
}
}
}

if !ready {
err = checkVolumeResyncStatus(ctx, localStatus)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

err = rbdVol.RepairResyncedImageID(ctx, ready)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error())
// update imageID for all the volumes
for _, vol := range volumes {
err = vol.RepairResyncedImageID(ctx, ready)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error())
}
}

resp := &replication.ResyncVolumeResponse{
Expand Down Expand Up @@ -828,23 +828,12 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
mgr := rbd.NewManager(rs.csiID, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, volumeID, req.GetReplicationSource())
if err != nil {
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, err.Error())
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, err.Error())
default:
err = status.Errorf(codes.Internal, err.Error())
}

return nil, err
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, getGRPCError(err)
}
defer mirror.Destroy(ctx)
defer destoryVolumes(ctx, volumes)

info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -975,3 +964,10 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus)

return nil
}

// destoryVolumes destroys the volume connections.
func destoryVolumes(ctx context.Context, volumes []types.Volume) {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}
12 changes: 2 additions & 10 deletions internal/csi-addons/rbd/volumegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(

// resolve all volumes
volumes := make([]types.Volume, len(req.GetVolumeIds()))
defer func() {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}()
defer destoryVolumes(ctx, volumes)
for i, id := range req.GetVolumeIds() {
vol, err := mgr.GetVolumeByID(ctx, id)
if err != nil {
Expand Down Expand Up @@ -348,11 +344,7 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership(

// resolve all volumes
volumes := make([]types.Volume, len(toAdd))
defer func() {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}()
defer destoryVolumes(ctx, volumes)
for i, id := range toAdd {
var vol types.Volume
vol, err = mgr.GetVolumeByID(ctx, id)
Expand Down
Loading

0 comments on commit 639ca8f

Please sign in to comment.