Skip to content

Commit

Permalink
core: initialize minio client at the beginning of restoring the backup (
Browse files Browse the repository at this point in the history
#556)

Signed-off-by: huanghaoyuanhhy <[email protected]>
  • Loading branch information
huanghaoyuanhhy authored Feb 26, 2025
1 parent 2abf8c7 commit c816f18
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 73 deletions.
84 changes: 12 additions & 72 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/restore"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
)

Expand Down Expand Up @@ -331,8 +330,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
}
return asyncResp
} else {
endTask, err := b.executeRestoreBackupTask(ctx, backupBucketName, backupPath, backup, task)
resp.Data = endTask
err := b.executeRestoreBackupTask(ctx, backupBucketName, backupPath, backup, task)
resp.Data = task
if err != nil {
resp.Code = backuppb.ResponseCode_Fail
log.Error("execute restore collection fail", zap.String("backupId", backup.GetId()), zap.Error(err))
Expand All @@ -345,83 +344,24 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
}
}

func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBucketName string, backupPath string, backup *backuppb.BackupInfo, task *backuppb.RestoreBackupTask) (*backuppb.RestoreBackupTask, error) {
func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBucketName, backupPath string, backup *backuppb.BackupInfo, task *backuppb.RestoreBackupTask) error {
b.mu.Lock()
defer b.mu.Unlock()

wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.RestoreParallelism, RPS)
if err != nil {
return task, err
}
wp.Start()
log.Info("Start collection level restore pool", zap.Int("parallelism", b.params.BackupCfg.RestoreParallelism))

id := task.GetId()
b.meta.UpdateRestoreTask(id, meta.SetRestoreStateCode(backuppb.RestoreTaskStateCode_EXECUTING))
log.Info("executeRestoreBackupTask start",
zap.String("backup_name", backup.GetName()),
zap.String("backupBucketName", backupBucketName),
zap.String("backupPath", backupPath))

restoreCollectionTasks := task.GetCollectionRestoreTasks()

// 3, execute restoreCollectionTasks
for _, restoreCollectionTask := range restoreCollectionTasks {
restoreCollectionTaskClone := restoreCollectionTask
job := func(ctx context.Context) error {
endTask, err := b.executeRestoreCollectionTask(ctx, backupBucketName, backupPath, restoreCollectionTaskClone, id)
if err != nil {
b.meta.UpdateRestoreTask(id, meta.SetRestoreStateCode(backuppb.RestoreTaskStateCode_FAIL),
meta.SetRestoreErrorMessage(endTask.ErrorMessage))
b.meta.UpdateRestoreCollectionTask(id, endTask.Id,
meta.SetRestoreCollectionStateCode(backuppb.RestoreTaskStateCode_FAIL),
meta.SetRestoreCollectionErrorMessage(endTask.ErrorMessage))
log.Error("executeRestoreCollectionTask failed",
zap.String("TargetDBName", restoreCollectionTaskClone.GetTargetDbName()),
zap.String("TargetCollectionName", restoreCollectionTaskClone.GetTargetCollectionName()),
zap.Error(err))
return err
}

restoreCollectionTaskClone.StateCode = backuppb.RestoreTaskStateCode_SUCCESS
log.Info("finish restore collection",
zap.String("db_name", restoreCollectionTaskClone.GetTargetDbName()),
zap.String("collection_name", restoreCollectionTaskClone.GetTargetCollectionName()),
zap.Int64("size", endTask.RestoredSize))
return nil
}
wp.Submit(job)
}
wp.Done()
if err := wp.Wait(); err != nil {
return task, err
}

endTime := time.Now().Unix()
task.EndTime = endTime
b.meta.UpdateRestoreTask(id, meta.SetRestoreStateCode(backuppb.RestoreTaskStateCode_SUCCESS), meta.SetRestoreEndTime(endTime))

log.Info("finish restore all collections",
zap.String("backupName", backup.GetName()),
zap.Int("collections", len(backup.GetCollectionBackups())),
zap.String("taskID", task.GetId()),
zap.Int64("duration in seconds", task.GetEndTime()-task.GetStartTime()))
return task, nil
}

func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backupBucketName string, backupPath string, task *backuppb.RestoreCollectionTask, parentTaskID string) (*backuppb.RestoreCollectionTask, error) {
collTask := restore.NewCollectionTask(task,
b.meta,
b.params,
parentTaskID,
backupBucketName,
restoreBackupTask := restore.NewTask(task,
backupPath,
backupBucketName,
b.params,
backup,
b.meta,
b.getBackupStorageClient(),
b.getMilvusStorageClient(),
b.getMilvusClient(),
b.getRestfulClient())

err := collTask.Execute(ctx)
if err := restoreBackupTask.Execute(ctx, task); err != nil {
return fmt.Errorf("backup: execute restore collection fail, err: %w", err)
}

return task, err
return nil
}
2 changes: 1 addition & 1 deletion core/restore/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type CollectionTask struct {
logger *zap.Logger
}

func NewCollectionTask(task *backuppb.RestoreCollectionTask,
func newCollectionTask(task *backuppb.RestoreCollectionTask,
meta *meta.MetaManager,
params *paramtable.BackupParams,
parentTaskID,
Expand Down
31 changes: 31 additions & 0 deletions core/restore/collection_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,32 @@
package restore

import (
"testing"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/stretchr/testify/assert"
)

func TestGetFailedReason(t *testing.T) {
t.Run("Normal", func(t *testing.T) {
r := getFailedReason([]*commonpb.KeyValuePair{{Key: "failed_reason", Value: "hello"}})
assert.Equal(t, "hello", r)
})

t.Run("WithoutFailedReason", func(t *testing.T) {
r := getFailedReason([]*commonpb.KeyValuePair{{Key: "hello", Value: "world"}})
assert.Equal(t, "", r)
})
}

func TestGetProcess(t *testing.T) {
t.Run("Normal", func(t *testing.T) {
r := getProcess([]*commonpb.KeyValuePair{{Key: "progress_percent", Value: "100"}})
assert.Equal(t, 100, r)
})

t.Run("WithoutProgress", func(t *testing.T) {
r := getProcess([]*commonpb.KeyValuePair{{Key: "hello", Value: "world"}})
assert.Equal(t, 0, r)
})
}
144 changes: 144 additions & 0 deletions core/restore/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package restore

import (
"context"
"fmt"
"time"

"go.uber.org/zap"

"github.com/zilliztech/milvus-backup/core/client"
"github.com/zilliztech/milvus-backup/core/meta"
"github.com/zilliztech/milvus-backup/core/paramtable"
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
)

type Task struct {
logger *zap.Logger

task *backuppb.RestoreBackupTask
info *backuppb.BackupInfo

params *paramtable.BackupParams
meta *meta.MetaManager

backupStorage storage.ChunkManager
milvusStorage storage.ChunkManager

grpcCli client.Grpc
restfulCli client.RestfulBulkInsert

backupBucketName string
backupPath string
}

func NewTask(task *backuppb.RestoreBackupTask,
backupPath string,
backupBucketName string,
params *paramtable.BackupParams,
info *backuppb.BackupInfo,
meta *meta.MetaManager,
backupStorage storage.ChunkManager,
milvusStorage storage.ChunkManager,
grpcCli client.Grpc,
restfulCli client.RestfulBulkInsert,
) *Task {
logger := log.L().With(
zap.String("backup_name", info.GetName()),
zap.String("backup_path", backupPath),
zap.String("backup_bucket_name", backupBucketName))

return &Task{
logger: logger,

task: task,
info: info,

params: params,
meta: meta,

backupStorage: backupStorage,
milvusStorage: milvusStorage,

grpcCli: grpcCli,
restfulCli: restfulCli,

backupBucketName: backupBucketName,
backupPath: backupPath,
}
}

func (t *Task) Execute(ctx context.Context, task *backuppb.RestoreBackupTask) error {
t.logger.Info("start restore backup")

wp, err := common.NewWorkerPool(ctx, t.params.BackupCfg.RestoreParallelism, 0)
if err != nil {
return fmt.Errorf("restore: create collection worker pool %w", err)
}
wp.Start()
t.logger.Info("Start collection level restore pool", zap.Int("parallelism", t.params.BackupCfg.RestoreParallelism))

id := task.GetId()
t.meta.UpdateRestoreTask(id, meta.SetRestoreStateCode(backuppb.RestoreTaskStateCode_EXECUTING))

collTaskMetas := task.GetCollectionRestoreTasks()
for _, collTaskMeta := range collTaskMetas {
collTask := t.newRestoreCollTask(collTaskMeta)
job := func(ctx context.Context) error {
err := collTask.Execute(ctx)
if err != nil {
t.meta.UpdateRestoreTask(id, meta.SetRestoreStateCode(backuppb.RestoreTaskStateCode_FAIL),
meta.SetRestoreErrorMessage(collTaskMeta.GetErrorMessage()))

t.meta.UpdateRestoreCollectionTask(id, collTaskMeta.GetId(),
meta.SetRestoreCollectionStateCode(backuppb.RestoreTaskStateCode_FAIL),
meta.SetRestoreCollectionErrorMessage(collTaskMeta.GetErrorMessage()))

t.logger.Error("restore coll failed",
zap.String("target_db_name", collTaskMeta.GetTargetDbName()),
zap.String("TargetCollectionName", collTaskMeta.GetTargetCollectionName()),
zap.Error(err))
return fmt.Errorf("restore: restore collection %w", err)
}

collTaskMeta.StateCode = backuppb.RestoreTaskStateCode_SUCCESS
log.Info("finish restore collection",
zap.String("target_db_name", collTaskMeta.GetTargetDbName()),
zap.String("target_collection_name", collTaskMeta.GetTargetCollectionName()),
zap.Int64("size", collTaskMeta.RestoredSize))
return nil
}
wp.Submit(job)
}
wp.Done()
if err := wp.Wait(); err != nil {
return fmt.Errorf("restore: wait collection worker pool %w", err)
}

endTime := time.Now().Unix()
task.EndTime = endTime
t.meta.UpdateRestoreTask(id, meta.SetRestoreStateCode(backuppb.RestoreTaskStateCode_SUCCESS), meta.SetRestoreEndTime(endTime))

t.logger.Info("finish restore all collections",
zap.String("backup_name", t.info.GetName()),
zap.Int("collection_num", len(t.info.GetCollectionBackups())),
zap.String("task_id", task.GetId()),
zap.Duration("duration", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second))
return nil
}

func (t *Task) newRestoreCollTask(collTask *backuppb.RestoreCollectionTask) *CollectionTask {
return newCollectionTask(collTask,
t.meta,
t.params,
t.task.GetId(),
t.backupBucketName,
t.backupPath,
t.backupStorage,
t.milvusStorage,
t.grpcCli,
t.restfulCli)
}

0 comments on commit c816f18

Please sign in to comment.