Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: 增加多线程分片上传和断续上传 #94

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cache

import (
"os"
"path/filepath"
"runtime"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)

// 存储本地数据库连接
var db *leveldb.DB

func GetDBName() string {
if runtime.GOOS == "windows" {
return filepath.Join(os.Getenv("USERPROFILE"), ".upx.db")
}
return filepath.Join(os.Getenv("HOME"), ".upx.db")
}

func GetClient() (*leveldb.DB, error) {
var err error
if db == nil {
db, err = leveldb.OpenFile(GetDBName(), nil)
}
return db, err
}

func Delete(key string) error {
db, err := GetClient()
if err != nil {
return err
}
return db.Delete([]byte(key), nil)
}

func Range(scoop string, fn func(key []byte, data []byte)) error {
db, err := GetClient()
if err != nil {
return err
}

iter := db.NewIterator(
util.BytesPrefix([]byte(scoop)),
nil,
)

for iter.Next() {
fn(iter.Key(), iter.Value())
}

iter.Release()
return iter.Error()
}
126 changes: 126 additions & 0 deletions cache/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package cache

import (
"encoding/json"
"fmt"
"time"
)

// 分片上传任务
type MutUpload struct {
UploadID string

// 文件总计大小
Size int64

// 分块大小
PartSize int64

// 本都文件路径
Path string

// 云端文件路径
UpPath string

// 上传时间
CreateAt time.Time
}

func (p *MutUpload) Key() string {
return fmt.Sprintf("mutupload-%s", p.UpPath)
}

// 查询分片上传任务
func FindMutUpload(fn func(key string, entity *MutUpload) bool) ([]*MutUpload, error) {
var result []*MutUpload
err := Range("mutupload-", func(key []byte, value []byte) {
var item = &MutUpload{}
if err := json.Unmarshal(value, item); err != nil {
db.Delete(key, nil)
return
}

// 删除过期的分片上传记录
if time.Since(item.CreateAt).Hours() > 24 {
FindMutUploadPart(func(key string, part *MutUploadPart) bool {
if part.UploadID == item.UploadID {
db.Delete([]byte(key), nil)
}
return false
})
db.Delete(key, nil)
}

if fn(string(key), item) {
result = append(result, item)
}
})
return result, err
}

// 添加分片上传
func AddMutUpload(entity *MutUpload) error {
db, err := GetClient()
if err != nil {
return err
}

data, err := json.Marshal(entity)
if err != nil {
return err
}

return db.Put([]byte(entity.Key()), data, nil)
}

// 分片上传任务下的具体分片信息
type MutUploadPart struct {
UploadID string
PartId int64
Len int64
}

func (p *MutUploadPart) Key() string {
return fmt.Sprintf("part-%s-%d", p.UploadID, p.PartId)
}

// 获取已经上传的分片
func FindMutUploadPart(fn func(key string, entity *MutUploadPart) bool) ([]*MutUploadPart, error) {
var result []*MutUploadPart
err := Range("part-", func(key []byte, value []byte) {
var item = &MutUploadPart{}
if err := json.Unmarshal(value, item); err != nil {
db.Delete(key, nil)
return
}

if fn(string(key), item) {
result = append(result, item)
}
})
return result, err
}

// 记录已经上传的分片
func AddMutUploadPart(entity *MutUploadPart) error {
db, err := GetClient()
if err != nil {
return err
}

data, err := json.Marshal(entity)
if err != nil {
return err
}
return db.Put([]byte(entity.Key()), data, nil)
}

func DeleteUpload(upPath, uploadID string) error {
Range("mutupload-"+upPath, func(key, data []byte) {
Delete(string(key))
})
Range("part-"+uploadID, func(key, data []byte) {
Delete(string(key))
})
return nil
}
66 changes: 66 additions & 0 deletions cache/upload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package cache

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestMutUpload(t *testing.T) {
mutUpload := &MutUpload{
UploadID: "1",
Size: 100 * 12,
PartSize: 100,
Path: "a.jpg",
UpPath: "b.jpg",
CreateAt: time.Now(),
}
assert.NoError(t, AddMutUpload(mutUpload))
assert.NoError(t, AddMutUpload(&MutUpload{
UploadID: "2",
Size: 100 * 12,
PartSize: 100,
Path: "/c/a.jpg",
UpPath: "b.jpg",
CreateAt: time.Now(),
}))
results, err := FindMutUpload(func(key string, entity *MutUpload) bool {
return key == mutUpload.Key()
})

assert.NoError(t, err)
assert.Equal(t, len(results), 1)
assert.Equal(
t,
results[0].Key(),
mutUpload.Key(),
)
}

func TestMutUploadPart(t *testing.T) {
part1s := []int64{}
for i := 0; i < 100; i++ {
part1s = append(part1s, int64(i))
}

for _, v := range part1s {
err := AddMutUploadPart(&MutUploadPart{
UploadID: "1",
PartId: v,
Len: 100,
})
assert.NoError(t, err)
}

part2s := []int64{}
records, err := FindMutUploadPart(func(key string, entity *MutUploadPart) bool {
return entity.UploadID == "1"
})
assert.NoError(t, err)
for _, v := range records {
part2s = append(part2s, v.PartId)
}

assert.ElementsMatch(t, part1s, part2s)
}
4 changes: 4 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,12 @@ func NewPutCommand() cli.Command {
upPath,
c.Int("w"),
c.Bool("all"),
c.Bool("c"),
)
return nil
},
Flags: []cli.Flag{
cli.BoolFlag{Name: "c", Usage: "continue put, resume broken put"},
cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5},
cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"},
},
Expand All @@ -373,10 +375,12 @@ func NewUploadCommand() cli.Command {
c.String("remote"),
c.Int("w"),
c.Bool("all"),
c.Bool("c"),
)
return nil
},
Flags: []cli.Flag{
cli.BoolFlag{Name: "c", Usage: "continue put, resume broken put"},
cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"},
cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5},
cli.StringFlag{Name: "remote", Usage: "remote path", Value: "./"},
Expand Down
34 changes: 19 additions & 15 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"strings"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/upyun/upx/cache"
)

var db *leveldb.DB
Expand All @@ -31,20 +32,22 @@ type dbValue struct {
Items []*fileMeta `json:"items"`
}

func getDBName() string {
if runtime.GOOS == "windows" {
return filepath.Join(os.Getenv("USERPROFILE"), ".upx.db")
}
return filepath.Join(os.Getenv("HOME"), ".upx.db")
}

func makeDBKey(src, dst string) ([]byte, error) {
return json.Marshal(&dbKey{
SrcPath: src,
DstPath: path.Join(session.Bucket, dst),
})
}

func parseDBKey(key []byte) (*dbKey, error) {
dbkey := &dbKey{}
err := json.Unmarshal(
key,
dbkey,
)
return dbkey, err
}

func makeDBValue(filename string, md5 bool) (*dbValue, error) {
finfo, err := os.Stat(filename)
if err != nil {
Expand Down Expand Up @@ -120,16 +123,17 @@ func delDBValue(src, dst string) error {

func delDBValues(srcPrefix, dstPrefix string) {
dstPrefix = path.Join(session.Bucket, dstPrefix)
iter := db.NewIterator(nil, nil)
iter := db.NewIterator(
util.BytesPrefix([]byte("{")),
nil,
)
if ok := iter.First(); !ok {
return
}
for {
k := new(dbKey)
key := iter.Key()
err := json.Unmarshal(key, k)
k, err := parseDBKey(iter.Key())
if err != nil {
PrintError("decode %s: %v", string(key), err)
PrintError("decode %s: %v", string(iter.Key()), err)
}
if strings.HasPrefix(k.SrcPath, srcPrefix) && strings.HasPrefix(k.DstPath, dstPrefix) {
PrintOnlyVerbose("found %s => %s to delete", k.SrcPath, k.DstPath)
Expand Down Expand Up @@ -182,9 +186,9 @@ func diffFileMetas(src []*fileMeta, dst []*fileMeta) []*fileMeta {
}

func initDB() (err error) {
db, err = leveldb.OpenFile(getDBName(), nil)
db, err = cache.GetClient()
if err != nil {
Print("db %v %s", err, getDBName())
Print("db %v %s", err, cache.GetDBName())
}
return err
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/rivo/uniseg v0.4.4 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
9 changes: 0 additions & 9 deletions partial/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ type Chunk struct {
buffer []byte
}

func NewChunk(index, start, end int64) *Chunk {
chunk := &Chunk{
start: start,
end: end,
index: index,
}
return chunk
}

func (p *Chunk) SetData(bytes []byte) {
p.buffer = bytes
}
Expand Down
Loading