Skip to content

Commit

Permalink
Add retry in backup prepare & Fix backup size lost bug (#284)
Browse files Browse the repository at this point in the history
* Add log to print Flush error

Signed-off-by: wayblink <[email protected]>

* Add retry in backup prepare

Signed-off-by: wayblink <[email protected]>

* Fix backup size lost bug

Signed-off-by: wayblink <[email protected]>

---------

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Jan 30, 2024
1 parent 7845b38 commit 6d5968b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
19 changes: 9 additions & 10 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/log"
"github.com/zilliztech/milvus-backup/internal/util/retry"
)

func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest) *backuppb.BackupInfoResponse {
Expand Down Expand Up @@ -300,7 +301,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
StateCode: backuppb.BackupTaskStateCode_BACKUP_INITIAL,
StartTime: time.Now().Unix(),
CollectionId: completeCollection.ID,
DbName: collection.db, // todo currently db_name is not used in many places
DbName: collection.db,
CollectionName: completeCollection.Name,
Schema: schema,
ShardsNum: completeCollection.ShardNum,
Expand Down Expand Up @@ -368,7 +369,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)))
newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), false)
if err != nil {
log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName()))
log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName()), zap.Error(err))
return err
}
log.Info("flush segments",
Expand Down Expand Up @@ -416,6 +417,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
// Flush
segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName())
if err != nil {
log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName()), zap.Error(err))
return err
}
log.Info("GetPersistentSegmentInfo from milvus",
Expand Down Expand Up @@ -544,7 +546,9 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
for _, collection := range toBackupCollections {
collectionClone := collection
job := func(ctx context.Context) error {
err := b.backupCollectionPrepare(ctx, backupInfo, collectionClone, request.GetForce())
err := retry.Do(ctx, func() error {
return b.backupCollectionPrepare(ctx, backupInfo, collectionClone, request.GetForce())
}, retry.Sleep(120*time.Second), retry.Attempts(128))
return err
}
jobId := b.getBackupCollectionWorkerPool().SubmitWithId(job)
Expand Down Expand Up @@ -578,17 +582,12 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
return backupInfo, err
}

var backupSize int64 = 0
leveledBackupInfo, err := treeToLevel(backupInfo)
_, err := treeToLevel(backupInfo)
if err != nil {
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL
backupInfo.ErrorMessage = err.Error()
return backupInfo, err
}
for _, coll := range leveledBackupInfo.collectionLevel.GetInfos() {
backupSize += coll.GetSize()
}
backupInfo.Size = backupSize
backupInfo.EndTime = time.Now().UnixNano() / int64(time.Millisecond)
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_SUCCESS
} else {
Expand Down Expand Up @@ -762,7 +761,7 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup
log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath))
fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false)
if len(fieldsLogDir) == 0 {
msg := fmt.Sprint("Get empty input path, but segment should not be empty, %s", insertPath)
msg := fmt.Sprintf("Get empty input path, but segment should not be empty, %s", insertPath)
return segmentBackupInfo, errors.New(msg)
}
if err != nil {
Expand Down
42 changes: 26 additions & 16 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,31 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
collections := make([]*backuppb.CollectionBackupInfo, 0)
partitions := make([]*backuppb.PartitionBackupInfo, 0)
segments := make([]*backuppb.SegmentBackupInfo, 0)

// recalculate backup size
var backupSize int64 = 0
for _, collectionBack := range backup.GetCollectionBackups() {
// recalculate backup size
var collectionSize int64 = 0
for _, partitionBack := range collectionBack.GetPartitionBackups() {
// recalculate backup size
var partitionSize int64 = 0
for _, segmentBack := range partitionBack.GetSegmentBackups() {
segments = append(segments, segmentBack)
partitionSize = partitionSize + segmentBack.GetSize()
}
partitionBack.Size = partitionSize
clonePartitionBackupInfo := &backuppb.PartitionBackupInfo{
PartitionId: partitionBack.GetPartitionId(),
PartitionName: partitionBack.GetPartitionName(),
CollectionId: partitionBack.GetCollectionId(),
Size: partitionBack.GetSize(),
LoadState: partitionBack.GetLoadState(),
}
partitions = append(partitions, clonePartitionBackupInfo)
collectionSize = collectionSize + partitionSize
}

collectionBack.Size = collectionSize
cloneCollectionBackup := &backuppb.CollectionBackupInfo{
CollectionId: collectionBack.GetCollectionId(),
DbName: collectionBack.GetDbName(),
Expand All @@ -65,21 +88,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
BackupPhysicalTimestamp: collectionBack.GetBackupPhysicalTimestamp(),
}
collections = append(collections, cloneCollectionBackup)

for _, partitionBack := range collectionBack.GetPartitionBackups() {
clonePartitionBackupInfo := &backuppb.PartitionBackupInfo{
PartitionId: partitionBack.GetPartitionId(),
PartitionName: partitionBack.GetPartitionName(),
CollectionId: partitionBack.GetCollectionId(),
Size: partitionBack.GetSize(),
LoadState: partitionBack.GetLoadState(),
}
partitions = append(partitions, clonePartitionBackupInfo)

for _, segmentBack := range partitionBack.GetSegmentBackups() {
segments = append(segments, segmentBack)
}
}
backupSize = backupSize + collectionSize
}

collectionLevel := &backuppb.CollectionLevelBackupInfo{
Expand All @@ -91,6 +100,7 @@ func treeToLevel(backup *backuppb.BackupInfo) (LeveledBackupInfo, error) {
segmentLevel := &backuppb.SegmentLevelBackupInfo{
Infos: segments,
}
backup.Size = backupSize
backupLevel := &backuppb.BackupInfo{
Id: backup.GetId(),
StateCode: backup.GetStateCode(),
Expand Down

0 comments on commit 6d5968b

Please sign in to comment.