Skip to content

Commit

Permalink
client, restore: use end_ts as key instead of end_time (#544)
Browse files Browse the repository at this point in the history
Signed-off-by: huanghaoyuanhhy <[email protected]>
  • Loading branch information
huanghaoyuanhhy authored Feb 20, 2025
1 parent d42df3f commit 8779dee
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
6 changes: 3 additions & 3 deletions core/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,15 +466,15 @@ type GrpcBulkInsertInput struct {
CollectionName string
PartitionName string
Paths []string // offset 0 is path to insertLog file, offset 1 is path to deleteLog file
EndTime int64
BackupTS uint64
IsL0 bool
}

func (g *GrpcClient) BulkInsert(ctx context.Context, input GrpcBulkInsertInput) (int64, error) {
ctx = g.newCtxWithDB(ctx, input.DB)
var opts []*commonpb.KeyValuePair
if input.EndTime > 0 {
opts = append(opts, &commonpb.KeyValuePair{Key: "end_time", Value: strconv.FormatInt(input.EndTime, 10)})
if input.BackupTS > 0 {
opts = append(opts, &commonpb.KeyValuePair{Key: "end_ts", Value: strconv.FormatUint(input.BackupTS, 10)})
}
if input.IsL0 {
opts = append(opts, &commonpb.KeyValuePair{Key: "l0_import", Value: "true"})
Expand Down
6 changes: 3 additions & 3 deletions core/client/restful.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type RestfulBulkInsertInput struct {
CollectionName string
PartitionName string
Paths [][]string // offset 0 is path to insertLog file, offset 1 is path to deleteLog file
EndTime int64
BackupTS uint64
IsL0 bool
}

Expand Down Expand Up @@ -97,8 +97,8 @@ type RestfulClient struct {

func (r *RestfulClient) BulkInsert(ctx context.Context, input RestfulBulkInsertInput) (string, error) {
opts := make(map[string]string)
if input.EndTime > 0 {
opts["end_time"] = strconv.FormatInt(input.EndTime, 10)
if input.BackupTS > 0 {
opts["end_ts"] = strconv.FormatUint(input.BackupTS, 10)
}
if input.IsL0 {
opts["l0_import"] = "true"
Expand Down
7 changes: 4 additions & 3 deletions core/restore/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,8 @@ func (ct *CollectionTask) copyFiles(ctx context.Context, paths []string) ([]stri
tempPaths := make([]string, 0, len(paths))
tempDir := fmt.Sprintf("restore-temp-%s-%s-%s/", ct.parentTaskID, ct.task.GetTargetDbName(), ct.task.GetTargetCollectionName())
ct.logger.Info("milvus and backup store in different bucket, copy the data first",
zap.Strings("paths", paths), zap.String("copy_data_path", tempDir))
zap.Strings("paths", paths),
zap.String("copy_data_path", tempDir))
for _, p := range paths {
// empty delta path, no need to copy
if len(p) == 0 {
Expand Down Expand Up @@ -761,7 +762,7 @@ func (ct *CollectionTask) bulkInsertViaGrpc(ctx context.Context, partition strin
CollectionName: ct.task.GetTargetCollectionName(),
PartitionName: partition,
Paths: paths,
EndTime: ct.task.GetCollBackup().EndTime,
BackupTS: ct.task.GetCollBackup().BackupTimestamp,
IsL0: isL0,
}
jobID, err := ct.grpcCli.BulkInsert(ctx, in)
Expand Down Expand Up @@ -810,7 +811,7 @@ func (ct *CollectionTask) bulkInsertViaRestful(ctx context.Context, partition st
CollectionName: ct.task.GetTargetCollectionName(),
PartitionName: partition,
Paths: paths,
EndTime: ct.task.GetCollBackup().EndTime,
BackupTS: ct.task.GetCollBackup().BackupTimestamp,
IsL0: isL0,
}
jobID, err := ct.restfulCli.BulkInsert(ctx, in)
Expand Down

0 comments on commit 8779dee

Please sign in to comment.