From e902968465ead97b5a55cc2fbbdb0928383759aa Mon Sep 17 00:00:00 2001 From: arrebole Date: Wed, 24 May 2023 17:29:12 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feature:=20=E5=A2=9E=E5=8A=A0=E5=A4=9A?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E5=88=86=E7=89=87=E4=B8=8A=E4=BC=A0=E5=92=8C?= =?UTF-8?q?=E6=96=AD=E7=BB=AD=E4=B8=8A=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cache/cache.go | 55 +++++++++++++ cache/upload.go | 145 +++++++++++++++++++++++++++++++++++ cache/upload_test.go | 66 ++++++++++++++++ commands.go | 4 + db.go | 34 ++++---- go.mod | 1 + go.sum | 2 + partial/chunk.go | 9 --- partial/downloader.go | 13 +++- partial/uploader.go | 109 ++++++++++++++++++++++++++ partial/uploader_test.go | 61 +++++++++++++++ session.go | 162 +++++++++++++++++++++++++++++++++------ 12 files changed, 610 insertions(+), 51 deletions(-) create mode 100644 cache/cache.go create mode 100644 cache/upload.go create mode 100644 cache/upload_test.go create mode 100644 partial/uploader.go create mode 100644 partial/uploader_test.go diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..e766f7e --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,55 @@ +package cache + +import ( + "os" + "path/filepath" + "runtime" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" +) + +// 存储本地数据库连接 +var db *leveldb.DB + +func GetDBName() string { + if runtime.GOOS == "windows" { + return filepath.Join(os.Getenv("USERPROFILE"), ".upx.db") + } + return filepath.Join(os.Getenv("HOME"), ".upx.db") +} + +func GetClient() (*leveldb.DB, error) { + var err error + if db == nil { + db, err = leveldb.OpenFile(GetDBName(), nil) + } + return db, err +} + +func Delete(key string) error { + db, err := GetClient() + if err != nil { + return err + } + return db.Delete([]byte(key), nil) +} + +func Range(scoop string, fn func(key []byte, data []byte)) error { + db, err := GetClient() + if err != nil { + return err + } + + iter := db.NewIterator( + util.BytesPrefix([]byte(scoop)), + nil, + ) + + for iter.Next() { + fn(iter.Key(), iter.Value()) + } + + iter.Release() + return iter.Error() +} diff --git a/cache/upload.go b/cache/upload.go new file mode 100644 index 0000000..5febb50 --- /dev/null +++ b/cache/upload.go @@ -0,0 +1,145 @@ +package cache + +import ( + "crypto/md5" + "encoding/json" + "fmt" + "time" +) + +// 分片上传任务 +type MutUpload struct { + UploadID string + + // 文件总计大小 + Size int64 + + // 分块大小 + PartSize int64 + + // 本都文件路径 + Path string + + // 云端文件路径 + UpPath string + + // 上传时间 + CreateAt time.Time +} + +func (p *MutUpload) Key() string { + fingerprint := fmt.Sprintf( + "%s:%s:%d:%d", + p.Path, + p.UpPath, + p.Size, + p.PartSize, + ) + + return fmt.Sprintf( + "mutupload-%x", + md5.Sum([]byte(fingerprint)), + ) +} + +// 查询分片上传任务 +func FindMutUpload(fn func(key string, entity *MutUpload) bool) ([]*MutUpload, error) { + var result []*MutUpload + err := Range("mutupload-", func(key []byte, value []byte) { + var item = &MutUpload{} + if err := json.Unmarshal(value, item); err != nil { + db.Delete(key, nil) + return + } + + // 删除过期的分片上传记录 + if time.Since(item.CreateAt).Hours() > 12 { + FindMutUploadPart(func(key string, part *MutUploadPart) bool { + if part.UploadID == item.UploadID { + db.Delete([]byte(key), nil) + } + return false + }) + db.Delete(key, nil) + } + + if fn(string(key), item) { + result = append(result, item) + } + }) + return result, err +} + +// 添加分片上传 +func AddMutUpload(entity *MutUpload) error { + db, err := GetClient() + if err != nil { + return err + } + + data, err := json.Marshal(entity) + if err != nil { + return err + } + + return db.Put([]byte(entity.Key()), data, nil) +} + +// 分片上传任务下的具体分片信息 +type MutUploadPart struct { + UploadID string + PartId int64 + Len int64 +} + +func (p *MutUploadPart) Key() string { + return fmt.Sprintf("part-%s-%d", p.UploadID, p.PartId) +} + +// 获取已经上传的分片 +func FindMutUploadPart(fn func(key string, entity *MutUploadPart) bool) ([]*MutUploadPart, error) { + var result []*MutUploadPart + err := Range("part-", func(key []byte, value []byte) { + var item = &MutUploadPart{} + if err := json.Unmarshal(value, item); err != nil { + db.Delete(key, nil) + return + } + + if fn(string(key), item) { + result = append(result, item) + } + }) + return result, err +} + +// 记录已经上传的分片 +func AddMutUploadPart(entity *MutUploadPart) error { + db, err := GetClient() + if err != nil { + return err + } + + data, err := json.Marshal(entity) + if err != nil { + return err + } + + return db.Put([]byte(entity.Key()), data, nil) +} + +func DeleteByUploadID(uploadID string) error { + FindMutUpload(func(key string, entity *MutUpload) bool { + if entity.UploadID == uploadID { + Delete(key) + } + return false + }) + FindMutUploadPart(func(key string, entity *MutUploadPart) bool { + if entity.UploadID == uploadID { + Delete(key) + } + return false + }) + return nil +} diff --git a/cache/upload_test.go b/cache/upload_test.go new file mode 100644 index 0000000..410c3b0 --- /dev/null +++ b/cache/upload_test.go @@ -0,0 +1,66 @@ +package cache + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestMutUpload(t *testing.T) { + mutUpload := &MutUpload{ + UploadID: "1", + Size: 100 * 12, + PartSize: 100, + Path: "a.jpg", + UpPath: "b.jpg", + CreateAt: time.Now(), + } + assert.NoError(t, AddMutUpload(mutUpload)) + assert.NoError(t, AddMutUpload(&MutUpload{ + UploadID: "2", + Size: 100 * 12, + PartSize: 100, + Path: "/c/a.jpg", + UpPath: "b.jpg", + CreateAt: time.Now(), + })) + results, err := FindMutUpload(func(key string, entity *MutUpload) bool { + return key == mutUpload.Key() + }) + + assert.NoError(t, err) + assert.Equal(t, len(results), 1) + assert.Equal( + t, + results[0].Key(), + mutUpload.Key(), + ) +} + +func TestMutUploadPart(t *testing.T) { + part1s := []int64{} + for i := 0; i < 100; i++ { + part1s = append(part1s, int64(i)) + } + + for _, v := range part1s { + err := AddMutUploadPart(&MutUploadPart{ + UploadID: "1", + PartId: v, + Len: 100, + }) + assert.NoError(t, err) + } + + part2s := []int64{} + records, err := FindMutUploadPart(func(key string, entity *MutUploadPart) bool { + return entity.UploadID == "1" + }) + assert.NoError(t, err) + for _, v := range records { + part2s = append(part2s, v.PartId) + } + + assert.ElementsMatch(t, part1s, part2s) +} diff --git a/commands.go b/commands.go index 13cf522..819eb00 100644 --- a/commands.go +++ b/commands.go @@ -348,10 +348,12 @@ func NewPutCommand() cli.Command { upPath, c.Int("w"), c.Bool("all"), + c.Bool("c"), ) return nil }, Flags: []cli.Flag{ + cli.BoolFlag{Name: "c", Usage: "continue put, resume broken put"}, cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5}, cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"}, }, @@ -373,10 +375,12 @@ func NewUploadCommand() cli.Command { c.String("remote"), c.Int("w"), c.Bool("all"), + c.Bool("c"), ) return nil }, Flags: []cli.Flag{ + cli.BoolFlag{Name: "c", Usage: "continue put, resume broken put"}, cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"}, cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5}, cli.StringFlag{Name: "remote", Usage: "remote path", Value: "./"}, diff --git a/db.go b/db.go index 8ac51b5..895cf5d 100644 --- a/db.go +++ b/db.go @@ -6,10 +6,11 @@ import ( "os" "path" "path/filepath" - "runtime" "strings" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" + "github.com/upyun/upx/cache" ) var db *leveldb.DB @@ -31,13 +32,6 @@ type dbValue struct { Items []*fileMeta `json:"items"` } -func getDBName() string { - if runtime.GOOS == "windows" { - return filepath.Join(os.Getenv("USERPROFILE"), ".upx.db") - } - return filepath.Join(os.Getenv("HOME"), ".upx.db") -} - func makeDBKey(src, dst string) ([]byte, error) { return json.Marshal(&dbKey{ SrcPath: src, @@ -45,6 +39,15 @@ func makeDBKey(src, dst string) ([]byte, error) { }) } +func parseDBKey(key []byte) (*dbKey, error) { + dbkey := &dbKey{} + err := json.Unmarshal( + key, + dbkey, + ) + return dbkey, err +} + func makeDBValue(filename string, md5 bool) (*dbValue, error) { finfo, err := os.Stat(filename) if err != nil { @@ -120,16 +123,17 @@ func delDBValue(src, dst string) error { func delDBValues(srcPrefix, dstPrefix string) { dstPrefix = path.Join(session.Bucket, dstPrefix) - iter := db.NewIterator(nil, nil) + iter := db.NewIterator( + util.BytesPrefix([]byte("{")), + nil, + ) if ok := iter.First(); !ok { return } for { - k := new(dbKey) - key := iter.Key() - err := json.Unmarshal(key, k) + k, err := parseDBKey(iter.Key()) if err != nil { - PrintError("decode %s: %v", string(key), err) + PrintError("decode %s: %v", string(iter.Key()), err) } if strings.HasPrefix(k.SrcPath, srcPrefix) && strings.HasPrefix(k.DstPath, dstPrefix) { PrintOnlyVerbose("found %s => %s to delete", k.SrcPath, k.DstPath) @@ -182,9 +186,9 @@ func diffFileMetas(src []*fileMeta, dst []*fileMeta) []*fileMeta { } func initDB() (err error) { - db, err = leveldb.OpenFile(getDBName(), nil) + db, err = cache.GetClient() if err != nil { - Print("db %v %s", err, getDBName()) + Print("db %v %s", err, cache.GetDBName()) } return err } diff --git a/go.mod b/go.mod index 0645198..5097fa6 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.2.0 // indirect golang.org/x/sys v0.8.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a5cfb1b..84cea5f 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/partial/chunk.go b/partial/chunk.go index 1184a53..bb42e5c 100644 --- a/partial/chunk.go +++ b/partial/chunk.go @@ -21,15 +21,6 @@ type Chunk struct { buffer []byte } -func NewChunk(index, start, end int64) *Chunk { - chunk := &Chunk{ - start: start, - end: end, - index: index, - } - return chunk -} - func (p *Chunk) SetData(bytes []byte) { p.buffer = bytes } diff --git a/partial/downloader.go b/partial/downloader.go index 9491a01..db13687 100644 --- a/partial/downloader.go +++ b/partial/downloader.go @@ -8,7 +8,9 @@ import ( "sync" ) -const DefaultChunkSize = 1024 * 1024 * 10 +type Downloader interface { + Download() error +} type ChunkDownFunc func(start, end int64) ([]byte, error) @@ -31,7 +33,7 @@ type MultiPartialDownloader struct { downFunc ChunkDownFunc } -func NewMultiPartialDownloader(filePath string, finalSize, chunkSize int64, writer io.Writer, works int, fn ChunkDownFunc) *MultiPartialDownloader { +func NewMultiPartialDownloader(filePath string, finalSize, chunkSize int64, writer io.Writer, works int, fn ChunkDownFunc) Downloader { return &MultiPartialDownloader{ filePath: filePath, finalSize: finalSize, @@ -100,7 +102,12 @@ func (p *MultiPartialDownloader) Download() error { if end > p.finalSize { end = p.finalSize } - chunk := NewChunk(int64(j), start, end) + + chunk := &Chunk{ + index: int64(j), + start: start, + end: end, + } // 重试三次 for t := 0; t < 3; t++ { diff --git a/partial/uploader.go b/partial/uploader.go new file mode 100644 index 0000000..9f01652 --- /dev/null +++ b/partial/uploader.go @@ -0,0 +1,109 @@ +package partial + +import ( + "context" + "io" + + "golang.org/x/sync/errgroup" +) + +// 下载器接口 +type Uploader interface { + Upload() error +} + +// 具体的分片下载函数 +type ChunkUploadFunc func(partId int64, body []byte) error + +// 多线程分片上传器 +type MultiPartialUploader struct { + //分片大小 + chunkSize int64 + + // 本地文件 + reader io.Reader + + // 线程数 + works int + + // 上传函数 + handleFunc ChunkUploadFunc +} + +func NewMultiPartialUploader(chunkSize int64, reader io.Reader, works int, fn ChunkUploadFunc) Uploader { + if works <= 0 { + panic("multiPartialUploader works must > 0") + } + if chunkSize <= 0 { + panic("multiPartialUploader chunkSize must > 0") + } + + return &MultiPartialUploader{ + works: works, + chunkSize: chunkSize, + handleFunc: fn, + reader: reader, + } +} + +func (p *MultiPartialUploader) Upload() error { + chunkUploadTask := make(chan *Chunk, p.works) + + // 任务发布者 + // 从reader中读取分片大小的数据,提交到上传任务队列 + go func() { + var chunkIndex int64 = 0 + for { + // 已经上传完成则跳过 + buffer := make([]byte, p.chunkSize) + nRead, err := p.reader.Read(buffer) + + chunkUploadTask <- &Chunk{ + index: chunkIndex, + buffer: buffer[0:nRead], + start: p.chunkSize * chunkIndex, + end: p.chunkSize*chunkIndex + int64(nRead), + err: err, + } + if err != nil { + break + } + chunkIndex++ + } + close(chunkUploadTask) + }() + + // 上传任务到云端 + group, ctx := errgroup.WithContext(context.Background()) + for i := 0; i < p.works; i++ { + group.Go(func() error { + if err := p.uploadChunks(ctx, chunkUploadTask); err != nil { + return err + } + return nil + }) + } + return group.Wait() +} + +func (p *MultiPartialUploader) uploadChunks(ctx context.Context, channel <-chan *Chunk) error { + for { + select { + case <-ctx.Done(): + return nil + case chunk, ok := <-channel: + if !ok { + return nil + } + if chunk.err != nil { + if chunk.err == io.EOF { + return nil + } + return chunk.err + } + if err := p.handleFunc(chunk.index, chunk.buffer); err != nil { + return err + } + } + } +} diff --git a/partial/uploader_test.go b/partial/uploader_test.go new file mode 100644 index 0000000..60229d4 --- /dev/null +++ b/partial/uploader_test.go @@ -0,0 +1,61 @@ +package partial + +import ( + "bytes" + "errors" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUploaderError(t *testing.T) { + filedata := []byte(strings.Repeat("hello world", 1024*100)) + uploader := NewMultiPartialUploader( + 1, + bytes.NewReader(filedata), + 3, + func(partId int64, body []byte) error { + if partId > 20 { + return errors.New("error") + } + return nil + }, + ) + + err := uploader.Upload() + assert.Error(t, err) + assert.Equal(t, err, errors.New("error")) +} + +func TestUploader(t *testing.T) { + filedata := []byte(strings.Repeat("hello world", 24)) + + parts := make(map[int64][]byte) + var mutex sync.RWMutex + uploader := NewMultiPartialUploader( + 10, + bytes.NewReader(filedata), + 3, + func(partId int64, body []byte) error { + mutex.Lock() + parts[partId] = body + mutex.Unlock() + return nil + }, + ) + + err := uploader.Upload() + assert.NoError(t, err) + + // 组合结果 + var buffer bytes.Buffer + for i := 0; i <= len(filedata)/10; i++ { + d, ok := parts[int64(i)] + assert.Equal(t, ok, true) + buffer.Write(d) + } + + assert.Equal(t, filedata, buffer.Bytes()) +} diff --git a/session.go b/session.go index 411618c..e442df7 100644 --- a/session.go +++ b/session.go @@ -21,6 +21,7 @@ import ( "github.com/arrebole/progressbar" "github.com/fatih/color" "github.com/upyun/go-sdk/v3/upyun" + "github.com/upyun/upx/cache" "github.com/upyun/upx/fsutil" "github.com/upyun/upx/partial" ) @@ -370,7 +371,7 @@ func (sess *Session) getFileWithProgress(upPath, localPath string, upInfo *upyun downloader := partial.NewMultiPartialDownloader( localPath, upInfo.Size, - partial.DefaultChunkSize, + DefaultBlockSize, w, works, func(start, end int64) ([]byte, error) { @@ -491,35 +492,143 @@ func (sess *Session) GetStartBetweenEndFiles(upPath, localPath string, match *Ma } } -func (sess *Session) putFileWithProgress(localPath, upPath string, localInfo os.FileInfo) error { +func (sess *Session) putFileWithProgress(localPath, upPath string, localInfo os.FileInfo, workers int, resume bool) error { var err error fd, err := os.Open(localPath) if err != nil { return err } defer fd.Close() - cfg := &upyun.PutObjectConfig{ - Path: upPath, - Headers: map[string]string{ - "Content-Length": fmt.Sprint(localInfo.Size()), - }, - Reader: fd, - } + var bar *progressbar.ProgressBar if isVerbose { if localInfo.Size() > 0 { - bar := AddBar(upPath, int(localInfo.Size())) - cfg.Reader = &WrappedReader{r: fd, bar: bar} + bar = AddBar(upPath, int(localInfo.Size())) } } else { log.Printf("file: %s, Start\n", upPath) - if localInfo.Size() >= MinResumePutFileSize { - cfg.UseResumeUpload = true - cfg.ResumePartSize = DefaultBlockSize - cfg.MaxResumePutTries = DefaultResumeRetry + } + + // 如果文件大小超过指标,会进行分片多线程下载 + // 如果开启断续上传 + // 1. 查询是否有过断续上传记录 + // 2. 如果有断续上传记录,继续检测文件是否被修改 + // 如果文件被修改,则重新上传 + // 如果文件没有被修改,则跳过已经上传的分片 + if localInfo.Size() >= MinResumePutFileSize { + var offset int64 + var initResult *upyun.InitMultipartUploadResult + var skips = make(map[int64]bool) + if resume { + // 获取已经上传的内容 + cache.FindMutUpload(func(key string, entity *cache.MutUpload) bool { + if entity.UpPath == upPath && entity.Path == localPath && entity.Size == localInfo.Size() { + initResult = &upyun.InitMultipartUploadResult{ + UploadID: entity.UploadID, + Path: entity.Path, + PartSize: entity.PartSize, + } + } + return false + }) + // 如果存在已经上传,则恢复 + // 查询已经上传的分片, 设置进度条偏移量,如果分片id已经被记录,则跳过上传 + if initResult != nil { + cache.FindMutUploadPart(func(key string, entity *cache.MutUploadPart) bool { + if entity.UploadID == initResult.UploadID { + if _, ok := skips[entity.PartId]; !ok { + offset += entity.Len + } + skips[entity.PartId] = true + } + return false + }) + } + } + + // 如果没有历史的分片任务,则创建分片任务 + if initResult == nil { + initResult, err = sess.updriver.InitMultipartUpload(&upyun.InitMultipartUploadConfig{ + Path: upPath, + PartSize: DefaultBlockSize, + }) + if err != nil { + return err + } + cache.AddMutUpload(&cache.MutUpload{ + UploadID: initResult.UploadID, + Path: localPath, + UpPath: upPath, + PartSize: initResult.PartSize, + Size: localInfo.Size(), + CreateAt: time.Now(), + }) + } + + // 设置已经上传成功的偏移量 + if bar != nil { + bar.SetOffset64(offset) + } + + // 上传分片任务 + fmt.Println(skips) + uploader := partial.NewMultiPartialUploader( + DefaultBlockSize, + fd, + workers, + func(partId int64, body []byte) error { + if _, ok := skips[partId]; ok { + return nil + } + err := sess.updriver.UploadPart(initResult, &upyun.UploadPartConfig{ + PartID: int(partId), + Reader: bytes.NewReader(body), + PartSize: int64(len(body)), + }) + if err != nil && err.(*upyun.Error).Code == 40011061 { + err = nil + } + + // 记录分片上传完成 + if err == nil { + if bar != nil { + bar.Add(len(body)) + } + cache.AddMutUploadPart(&cache.MutUploadPart{ + UploadID: initResult.UploadID, + PartId: partId, + Len: int64(len(body)), + }) + } + return err + }, + ) + + if err = uploader.Upload(); err != nil { + return err } + + // 完成分片上传 + err = sess.updriver.CompleteMultipartUpload(initResult, nil) + if err != nil { + return err + } + // 上传完成删除记录 + cache.DeleteByUploadID(initResult.UploadID) + } else { + cfg := &upyun.PutObjectConfig{ + Path: upPath, + Headers: map[string]string{ + "Content-Length": fmt.Sprint(localInfo.Size()), + }, + Reader: fd, + } + if bar != nil { + cfg.Reader = &WrappedReader{r: fd, bar: bar} + } + err = sess.updriver.Put(cfg) } - err = sess.updriver.Put(cfg) + if !isVerbose { log.Printf("file: %s, Done\n", upPath) } @@ -572,7 +681,7 @@ func (sess *Session) putRemoteFileWithProgress(rawURL, upPath string) error { return nil } -func (sess *Session) putFilesWitchProgress(localFiles []*UploadedFile, workers int) { +func (sess *Session) putFilesWitchProgress(localFiles []*UploadedFile, workers int, resume bool) { var wg sync.WaitGroup tasks := make(chan *UploadedFile, workers*2) @@ -585,6 +694,8 @@ func (sess *Session) putFilesWitchProgress(localFiles []*UploadedFile, workers i task.LocalPath, task.UpPath, task.LocalInfo, + 1, + resume, ) if err != nil { fmt.Println("putFileWithProgress error: ", err.Error()) @@ -602,7 +713,7 @@ func (sess *Session) putFilesWitchProgress(localFiles []*UploadedFile, workers i wg.Wait() } -func (sess *Session) putDir(localPath, upPath string, workers int, withIgnore bool) { +func (sess *Session) putDir(localPath, upPath string, workers int, withIgnore bool, resume bool) { localAbsPath, err := filepath.Abs(localPath) if err != nil { PrintErrorAndExit(err.Error()) @@ -632,7 +743,7 @@ func (sess *Session) putDir(localPath, upPath string, workers int, withIgnore bo if fInfo, err := os.Stat(info.fpath); err == nil && fInfo.IsDir() { err = sess.updriver.Mkdir(desPath) } else { - err = sess.putFileWithProgress(info.fpath, desPath, info.fInfo) + err = sess.putFileWithProgress(info.fpath, desPath, info.fInfo, 1, resume) } if err != nil { return @@ -663,7 +774,7 @@ func (sess *Session) putDir(localPath, upPath string, workers int, withIgnore bo } // / Put 上传单文件或单目录 -func (sess *Session) Put(localPath, upPath string, workers int, withIgnore bool) { +func (sess *Session) Put(localPath, upPath string, workers int, withIgnore bool, resume bool) { upPath = sess.AbsPath(upPath) exist, isDir := false, false @@ -714,17 +825,17 @@ func (sess *Session) Put(localPath, upPath string, workers int, withIgnore bool) upPath = path.Join(upPath, filepath.Base(localPath)) } } - sess.putDir(localPath, upPath, workers, withIgnore) + sess.putDir(localPath, upPath, workers, withIgnore, resume) } else { if isDir { upPath = path.Join(upPath, filepath.Base(localPath)) } - sess.putFileWithProgress(localPath, upPath, localInfo) + sess.putFileWithProgress(localPath, upPath, localInfo, workers, resume) } } // put 的升级版命令, 支持多文件上传 -func (sess *Session) Upload(filenames []string, upPath string, workers int, withIgnore bool) { +func (sess *Session) Upload(filenames []string, upPath string, workers int, withIgnore bool, resume bool) { upPath = sess.AbsPath(upPath) // 检测云端的目的地目录 @@ -733,6 +844,7 @@ func (sess *Session) Upload(filenames []string, upPath string, workers int, with upPathExist = true upPathIsDir = upInfo.IsDir } + // 多文件上传 upPath 如果存在则只能是目录 if upPathExist && !upPathIsDir { PrintErrorAndExit("upload: %s: Not a directory", upPath) @@ -767,11 +879,12 @@ func (sess *Session) Upload(filenames []string, upPath string, workers int, with path.Join(upPath, filepath.Base(localPath)), workers, withIgnore, + resume, ) } // 上传文件 - sess.putFilesWitchProgress(uploadedFile, workers) + sess.putFilesWitchProgress(uploadedFile, workers, resume) } func (sess *Session) rm(fpath string, isAsync bool, isFolder bool) { @@ -1157,6 +1270,7 @@ func (sess *Session) Sync(localPath, upPath string, workers int, delete, strong } } } + func (sess *Session) PostTask(app, notify, taskFile string) { fd, err := os.Open(taskFile) if err != nil { From c98b4417470dd3d914cdfcf419f15d27596aab34 Mon Sep 17 00:00:00 2001 From: arrebole Date: Thu, 25 May 2023 16:15:03 +0800 Subject: [PATCH 2/2] =?UTF-8?q?chore:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E7=89=87=E4=B8=8A=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 超时时间默认24小时 - 加速分片记录的删除 --- cache/upload.go | 33 +++++++-------------------------- session.go | 3 +-- 2 files changed, 8 insertions(+), 28 deletions(-) diff --git a/cache/upload.go b/cache/upload.go index 5febb50..4b319bc 100644 --- a/cache/upload.go +++ b/cache/upload.go @@ -1,7 +1,6 @@ package cache import ( - "crypto/md5" "encoding/json" "fmt" "time" @@ -28,18 +27,7 @@ type MutUpload struct { } func (p *MutUpload) Key() string { - fingerprint := fmt.Sprintf( - "%s:%s:%d:%d", - p.Path, - p.UpPath, - p.Size, - p.PartSize, - ) - - return fmt.Sprintf( - "mutupload-%x", - md5.Sum([]byte(fingerprint)), - ) + return fmt.Sprintf("mutupload-%s", p.UpPath) } // 查询分片上传任务 @@ -53,7 +41,7 @@ func FindMutUpload(fn func(key string, entity *MutUpload) bool) ([]*MutUpload, e } // 删除过期的分片上传记录 - if time.Since(item.CreateAt).Hours() > 12 { + if time.Since(item.CreateAt).Hours() > 24 { FindMutUploadPart(func(key string, part *MutUploadPart) bool { if part.UploadID == item.UploadID { db.Delete([]byte(key), nil) @@ -124,22 +112,15 @@ func AddMutUploadPart(entity *MutUploadPart) error { if err != nil { return err } - return db.Put([]byte(entity.Key()), data, nil) } -func DeleteByUploadID(uploadID string) error { - FindMutUpload(func(key string, entity *MutUpload) bool { - if entity.UploadID == uploadID { - Delete(key) - } - return false +func DeleteUpload(upPath, uploadID string) error { + Range("mutupload-"+upPath, func(key, data []byte) { + Delete(string(key)) }) - FindMutUploadPart(func(key string, entity *MutUploadPart) bool { - if entity.UploadID == uploadID { - Delete(key) - } - return false + Range("part-"+uploadID, func(key, data []byte) { + Delete(string(key)) }) return nil } diff --git a/session.go b/session.go index e442df7..bdce046 100644 --- a/session.go +++ b/session.go @@ -571,7 +571,6 @@ func (sess *Session) putFileWithProgress(localPath, upPath string, localInfo os. } // 上传分片任务 - fmt.Println(skips) uploader := partial.NewMultiPartialUploader( DefaultBlockSize, fd, @@ -614,7 +613,7 @@ func (sess *Session) putFileWithProgress(localPath, upPath string, localInfo os. return err } // 上传完成删除记录 - cache.DeleteByUploadID(initResult.UploadID) + cache.DeleteUpload(upPath, initResult.UploadID) } else { cfg := &upyun.PutObjectConfig{ Path: upPath,