Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use restful api to create bulk inert job #503

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ all: gen build

# Build the binary
build:
@echo "Building binary..."
GO111MODULE=on CGO_ENABLED=0 go build -ldflags '$(LDFLAGS)' -o $(BINARY_NAME)
@echo "Building Backup binary..."
@echo "Version: $(VERSION)"
@echo "Commit: $(COMMIT)"
@echo "Date: $(DATE)"
@GO111MODULE=on CGO_ENABLED=0 go build -ldflags '$(LDFLAGS)' -o $(BINARY_NAME)

gen:
./scripts/gen_swag.sh
Expand Down
24 changes: 19 additions & 5 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type BackupContext struct {
bulkinsertWorkerPools sync.Map
}

func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (client.Grpc, error) {
func paramsToCfg(params *paramtable.BackupParams) (*client.Cfg, error) {
ep := params.MilvusCfg.Address + ":" + params.MilvusCfg.Port
log.Debug("Start Milvus client", zap.String("endpoint", ep))

Expand All @@ -76,6 +76,7 @@ func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (c
enableTLS = true
default:
log.Error("milvus.TLSMode is illegal, support value 0, 1, 2")
return nil, fmt.Errorf("milvus.TLSMode is illegal, support value 0, 1, 2")
}

cfg := &client.Cfg{
Expand All @@ -84,6 +85,17 @@ func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (c
Username: params.MilvusCfg.User,
Password: params.MilvusCfg.Password,
}

return cfg, nil
}

func CreateGrpcClient(params *paramtable.BackupParams) (client.Grpc, error) {
cfg, err := paramsToCfg(params)
if err != nil {
log.Error("failed to create milvus client", zap.Error(err))
return nil, fmt.Errorf("failed to create milvus client: %w", err)
}

cli, err := client.NewGrpc(cfg)
if err != nil {
log.Error("failed to create milvus client", zap.Error(err))
Expand All @@ -93,10 +105,12 @@ func CreateMilvusClient(ctx context.Context, params *paramtable.BackupParams) (c
}

func CreateRestfulClient(params *paramtable.BackupParams) (client.Restful, error) {
ep := params.MilvusCfg.Address + ":" + params.MilvusCfg.Port
log.Debug("Start Restful client", zap.String("endpoint", ep))
cfg, err := paramsToCfg(params)
if err != nil {
log.Error("failed to create restful client", zap.Error(err))
return nil, fmt.Errorf("failed to create restful client: %w", err)
}

cfg := &client.Cfg{Address: ep, Username: params.MilvusCfg.User, Password: params.MilvusCfg.Password}
cli, err := client.NewRestful(cfg)
if err != nil {
log.Error("failed to create restful client", zap.Error(err))
Expand Down Expand Up @@ -136,7 +150,7 @@ func CreateBackupContext(ctx context.Context, params *paramtable.BackupParams) *

func (b *BackupContext) getMilvusClient() client.Grpc {
if b.grpcClient == nil {
milvusClient, err := CreateMilvusClient(b.ctx, b.params)
milvusClient, err := CreateGrpcClient(b.params)
if err != nil {
log.Error("failed to initial milvus client", zap.Error(err))
panic(err)
Expand Down
4 changes: 4 additions & 0 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
zap.String("path", request.GetPath()),
zap.String("databaseCollections", utils.GetRestoreDBCollections(request)),
zap.Bool("skipDiskQuotaCheck", request.GetSkipImportDiskQuotaCheck()),
zap.Any("skipParams", request.GetSkipParams()),
zap.Bool("useV2Restore", request.GetUseV2Restore()),
zap.Int32("maxShardNum", request.GetMaxShardNum()))

resp := &backuppb.RestoreBackupResponse{
Expand Down Expand Up @@ -311,6 +313,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
SkipCreateCollection: request.GetSkipCreateCollection(),
SkipDiskQuotaCheck: request.GetSkipImportDiskQuotaCheck(),
MaxShardNum: request.GetMaxShardNum(),
SkipParams: request.GetSkipParams(),
UseV2Restore: request.GetUseV2Restore(),
}
restoreCollectionTasks = append(restoreCollectionTasks, restoreCollectionTask)
task.CollectionRestoreTasks = restoreCollectionTasks
Expand Down
6 changes: 3 additions & 3 deletions core/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Grpc interface {
Flush(ctx context.Context, db, collName string) (*milvuspb.FlushResponse, error)
ListCollections(ctx context.Context, db string) (*milvuspb.ShowCollectionsResponse, error)
HasCollection(ctx context.Context, db, collName string) (bool, error)
BulkInsert(ctx context.Context, input BulkInsertInput) (int64, error)
BulkInsert(ctx context.Context, input GrpcBulkInsertInput) (int64, error)
GetBulkInsertState(ctx context.Context, taskID int64) (*milvuspb.GetImportStateResponse, error)
CreateCollection(ctx context.Context, input CreateCollectionInput) error
CreatePartition(ctx context.Context, db, collName, partitionName string) error
Expand Down Expand Up @@ -355,7 +355,7 @@ func (m *GrpcClient) HasCollection(ctx context.Context, db, collName string) (bo
return resp.GetValue(), nil
}

type BulkInsertInput struct {
type GrpcBulkInsertInput struct {
DB string
CollectionName string
PartitionName string
Expand All @@ -366,7 +366,7 @@ type BulkInsertInput struct {
SkipDiskQuotaCheck bool
}

func (m *GrpcClient) BulkInsert(ctx context.Context, input BulkInsertInput) (int64, error) {
func (m *GrpcClient) BulkInsert(ctx context.Context, input GrpcBulkInsertInput) (int64, error) {
ctx = m.newCtxWithDB(ctx, input.DB)
var opts []*commonpb.KeyValuePair
if input.EndTime > 0 {
Expand Down
46 changes: 33 additions & 13 deletions core/client/restful.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,28 @@ import (
"github.com/zilliztech/milvus-backup/internal/log"
)

type ImportState string

const (
ImportStatePending ImportState = "Pending"
ImportStateImporting ImportState = "Importing"
ImportStateCompleted ImportState = "Completed"
ImportStateFailed ImportState = "Failed"
)

type RestfulBulkInsertInput struct {
DB string
CollectionName string
PartitionName string
// offset 0 is path to insertLog file, offset 1 is path to deleteLog file
Paths [][]string
EndTime int64
IsL0 bool
SkipDiskQuotaCheck bool
}

type Restful interface {
BulkInsert(ctx context.Context, db, collName, partitionName string, files []string, endTime int64, isL0 bool, skipDiskQuotaCheck bool) (string, error)
BulkInsert(ctx context.Context, input RestfulBulkInsertInput) (string, error)
GetBulkInsertState(ctx context.Context, db, jobID string) (*GetProcessResp, error)
}

Expand Down Expand Up @@ -73,27 +93,27 @@ type RestfulClient struct {
cli *req.Client
}

func (r *RestfulClient) BulkInsert(ctx context.Context, db, collName, partitionName string, files []string, endTime int64, isL0 bool, skipDiskQuotaCheck bool) (string, error) {
func (r *RestfulClient) BulkInsert(ctx context.Context, input RestfulBulkInsertInput) (string, error) {
opts := make(map[string]string)
if endTime > 0 {
opts["end_time"] = strconv.FormatInt(endTime, 10)
if input.EndTime > 0 {
opts["end_time"] = strconv.FormatInt(input.EndTime, 10)
}
if isL0 {
if input.IsL0 {
opts["l0_import"] = "true"
} else {
opts["backup"] = "true"
}
opts["skip_disk_quota_check"] = strconv.FormatBool(skipDiskQuotaCheck)
opts["skip_disk_quota_check"] = strconv.FormatBool(input.SkipDiskQuotaCheck)

createReq := createImportReq{
DbName: db,
CollectionName: collName,
PartitionName: partitionName,
Files: [][]string{files},
DbName: input.DB,
CollectionName: input.CollectionName,
PartitionName: input.PartitionName,
Files: input.Paths,
Options: opts,
}
var createResp createImportResp
log.Info("create import job via restful", zap.Any("createReq", createReq))
log.Debug("create import job via restful", zap.Any("createReq", createReq))
resp, err := r.cli.R().
SetContext(ctx).
SetBody(createReq).
Expand All @@ -102,7 +122,7 @@ func (r *RestfulClient) BulkInsert(ctx context.Context, db, collName, partitionN
if err != nil {
return "", fmt.Errorf("client: failed to create import job via restful: %w", err)
}
log.Info("create import job via restful", zap.Any("createResp", resp))
log.Debug("create import job via restful", zap.Any("createResp", resp))
if resp.IsErrorState() {
return "", fmt.Errorf("client: failed to create import job via restful: %v", resp)
}
Expand All @@ -125,7 +145,7 @@ func (r *RestfulClient) GetBulkInsertState(ctx context.Context, dbName, jobID st
if err != nil {
return nil, fmt.Errorf("client: failed to get import job state via restful: %w", err)
}
log.Info("get import job state via restful", zap.Any("getResp", resp))
log.Debug("get import job state via restful", zap.Any("getResp", resp))
if resp.IsErrorState() {
return nil, fmt.Errorf("client: failed to get import job state via restful: %v", resp)
}
Expand Down
14 changes: 8 additions & 6 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,12 @@ enum RestoreTaskStateCode {
}

message SkipParams {
repeated string collection_properties = 1;
repeated string collectionProperties = 1;

repeated string filed_index_params = 2;
repeated string filed_type_params = 3;
repeated string fieldIndexParams = 2;
repeated string fieldTypeParams = 3;

repeated string index_params = 4;
repeated string indexParams = 4;
}

message RestoreBackupRequest {
Expand Down Expand Up @@ -295,7 +295,8 @@ message RestoreBackupRequest {
// target max shard number
int32 maxShardNum = 19;
// if key is set, will skip the params in restore process
SkipParams skip_params = 20;
SkipParams skipParams = 20;
bool useV2Restore = 21;
}

message RestorePartitionTask {
Expand Down Expand Up @@ -336,7 +337,8 @@ message RestoreCollectionTask {
bool skipDiskQuotaCheck = 19;
// target max shard number
int32 maxShardNum = 20;
SkipParams skip_params = 21;
SkipParams skipParams = 21;
bool useV2Restore = 22;
}

message RestoreBackupTask {
Expand Down
Loading
Loading