Skip to content

Commit

Permalink
mulitpart
Browse files Browse the repository at this point in the history
  • Loading branch information
huangnauh committed Oct 28, 2024
1 parent 14076d5 commit 9fa7c61
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 91 deletions.
105 changes: 45 additions & 60 deletions upyun/io.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,63 @@
package upyun

import (
"bytes"
"crypto/md5"
"fmt"
"io"
"os"
)

type UpYunPutReader interface {
Len() (n int)
MD5() (ret string)
Read([]byte) (n int, err error)
Copyed() (n int)
}

type fragmentFile struct {
realFile *os.File
offset int64
limit int64
cursor int64
type Chunk struct {
buf io.Reader
buf2 *bytes.Buffer
id int
n int
}

func (f *fragmentFile) Seek(offset int64, whence int) (ret int64, err error) {
switch whence {
case 0:
f.cursor = offset
ret, err = f.realFile.Seek(f.offset+f.cursor, 0)
return ret - f.offset, err
default:
return 0, fmt.Errorf("whence must be 0")
func (c *Chunk) Read(b []byte) (n int, err error) {
if c.buf2 != nil {
return c.buf2.Read(b)
}
}

func (f *fragmentFile) Read(b []byte) (n int, err error) {
if f.cursor >= f.limit {
return 0, io.EOF
}
n, err = f.realFile.Read(b)
if f.cursor+int64(n) > f.limit {
n = int(f.limit - f.cursor)
}
f.cursor += int64(n)
return n, err
}

func (f *fragmentFile) Stat() (fInfo os.FileInfo, err error) {
return fInfo, fmt.Errorf("fragmentFile not implement Stat()")
}

func (f *fragmentFile) Close() error {
return nil
}

func (f *fragmentFile) Copyed() int {
return int(f.cursor - f.offset)
}

func (f *fragmentFile) Len() int {
return int(f.limit - f.offset)
}

func (f *fragmentFile) MD5() string {
s, _ := md5File(f)
return s
}

func newFragmentFile(file *os.File, offset, limit int64) (*fragmentFile, error) {
f := &fragmentFile{
realFile: file,
offset: offset,
limit: limit,
}

if _, err := f.Seek(0, 0); err != nil {
return nil, err
return c.buf.Read(b)
}

func (c *Chunk) Len() int {
return c.n
}
func (c *Chunk) ID() int {
return c.id
}

func (c *Chunk) MD5() string {
c.buf2 = bytes.NewBuffer(nil)
reader := io.TeeReader(c.buf, c.buf2)
hash := md5.New()
_, _ = io.Copy(hash, reader)
return fmt.Sprintf("%x", hash.Sum(nil))
}

func GetReadChunk(input io.Reader, size, partSize int64, ch chan *Chunk) {
id := 0
bytesLeft := size
for bytesLeft > 0 {
n := partSize
if bytesLeft <= partSize {
n = bytesLeft
}
reader := io.LimitReader(input, n)
ch <- &Chunk{
buf: reader,
id: id,
n: int(n),
}
id++
bytesLeft -= n
}
return f, nil
close(ch)
}
105 changes: 74 additions & 31 deletions upyun/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type GetRequestConfig struct {
Headers map[string]string
}

type ProxyReader func(size, offset int64, r io.Reader) io.Reader

// PutObjectConfig provides a configuration to Put method.
type PutObjectConfig struct {
Path string
Expand All @@ -91,6 +93,7 @@ type PutObjectConfig struct {
// AppendContent bool
ResumePartSize int64
MaxResumePutTries int
ProxyReader ProxyReader
}

type MoveObjectConfig struct {
Expand Down Expand Up @@ -278,6 +281,34 @@ func getPartInfo(partSize, fsize int64) (int64, int64, error) {
return partSize, partNum, nil
}

func (up *UpYun) getMultipartUploadProcess(config *PutObjectConfig, fsize int64) (*ResumeProcessResult, error) {
resumeProcessResult, _ := up.GetResumeProcess(config.Path)
if resumeProcessResult != nil {
if resumeProcessResult.Order {
return resumeProcessResult, nil
}
}

initMultipartUploadConfig := &InitMultipartUploadConfig{
Path: config.Path,
ContentLength: fsize,
PartSize: config.ResumePartSize,
ContentType: config.Headers["Content-Type"],
OrderUpload: true,
}
initMultipartUploadResult, err := up.InitMultipartUpload(initMultipartUploadConfig)
if err != nil {
return nil, err
}
return &ResumeProcessResult{
UploadID: initMultipartUploadResult.UploadID,
Path: initMultipartUploadConfig.Path,
NextPartID: 0,
NextPartSize: config.ResumePartSize,
Parts: make([]*DisorderPart, 0),
}, nil
}

func (up *UpYun) resumePut(config *PutObjectConfig) error {
f, ok := config.Reader.(*os.File)
if !ok {
Expand All @@ -301,7 +332,6 @@ func (up *UpYun) resumePut(config *PutObjectConfig) error {
if config.Headers == nil {
config.Headers = make(map[string]string)
}
headers := config.Headers

var breakpoint *BreakPointConfig
if up.Recorder != nil {
Expand All @@ -310,23 +340,16 @@ func (up *UpYun) resumePut(config *PutObjectConfig) error {
// first upload or file has expired
maxPartID := int((fsize+config.ResumePartSize-1)/config.ResumePartSize - 1)

var uploadInfo *InitMultipartUploadResult
if breakpoint == nil || isRecordExpired(fileinfo, breakpoint) {
uploadInfo, err = up.InitMultipartUpload(&InitMultipartUploadConfig{
Path: config.Path,
PartSize: config.ResumePartSize,
ContentType: headers["Content-Type"],
ContentLength: fsize,
OrderUpload: true,
})
uploadProcess, err := up.getMultipartUploadProcess(config, fsize)
if err != nil {
return err
}

breakpoint = &BreakPointConfig{
UploadID: uploadInfo.UploadID,
PartSize: uploadInfo.PartSize,
PartID: 0,
UploadID: uploadProcess.UploadID,
PartSize: uploadProcess.NextPartSize,
PartID: int(uploadProcess.NextPartID),
}

if up.Recorder != nil {
Expand Down Expand Up @@ -590,6 +613,21 @@ func (up *UpYun) GetRequest(config *GetRequestConfig) (*http.Response, error) {
return resp, nil
}

func (up *UpYun) GetInfoWithHeaders(path string, headers map[string]string) (*FileInfo, error) {
resp, err := up.doRESTRequest(&restReqConfig{
method: "HEAD",
uri: path,
headers: headers,
closeBody: true,
})
if err != nil {
return nil, errorOperation("get info", err)
}
fInfo := parseHeaderToFileInfo(resp.Header, true)
fInfo.Name = path
return fInfo, nil
}

func (up *UpYun) GetInfo(path string) (*FileInfo, error) {
resp, err := up.doRESTRequest(&restReqConfig{
method: "HEAD",
Expand Down Expand Up @@ -876,21 +914,21 @@ type BreakPointConfig struct {
LastTime time.Time
}

func (up *UpYun) resumeUploadPart(config *PutObjectConfig, breakpoint *BreakPointConfig, f *os.File, fileInfo fs.FileInfo) (*BreakPointConfig, error) {
func (up *UpYun) resumeUploadPart(config *PutObjectConfig, breakpoint *BreakPointConfig, f io.Reader, fileInfo fs.FileInfo) (*BreakPointConfig, error) {
fsize := fileInfo.Size()
maxPartID := int((fsize+config.ResumePartSize-1)/config.ResumePartSize - 1)
partID := breakpoint.PartID
curSize, partSize := int64(partID)*breakpoint.PartSize, breakpoint.PartSize

for id := partID; id <= maxPartID; id++ {
if curSize+partSize > fsize {
partSize = fsize - curSize
}
fragFile, err := newFragmentFile(f, curSize, partSize)
if err != nil {
return breakpoint, errorOperation("new fragment file", err)
}

bytesLeft := fsize - curSize
ch := make(chan *Chunk, 1)
var err error
var reader io.Reader
if config.ProxyReader != nil {
reader = config.ProxyReader(fsize, curSize, f)
} else {
reader = f
}
go GetReadChunk(reader, bytesLeft, partSize, ch)
for chunk := range ch {
try := 0
for ; config.MaxResumePutTries == 0 || try < config.MaxResumePutTries; try++ {
err = up.UploadPart(
Expand All @@ -900,28 +938,26 @@ func (up *UpYun) resumeUploadPart(config *PutObjectConfig, breakpoint *BreakPoin
PartSize: breakpoint.PartSize,
},
&UploadPartConfig{
PartID: id,
PartSize: partSize,
Reader: fragFile,
PartID: partID + chunk.ID(),
PartSize: int64(chunk.Len()),
Reader: chunk,
})
if err == nil {
break
}
}

if config.MaxResumePutTries > 0 && try == config.MaxResumePutTries {
breakpoint.PartID = id
breakpoint.PartID = partID + chunk.ID()
breakpoint.FileSize = fsize
breakpoint.LastTime = time.Now()
breakpoint.FileModTime = fileInfo.ModTime()

if up.Recorder != nil {
up.Recorder.Set(config.Path, breakpoint)
}
return breakpoint, err
}
curSize += partSize
breakpoint.PartID = id + 1
breakpoint.PartID = partID + chunk.ID() + 1
}
return breakpoint, nil
}
Expand All @@ -940,6 +976,7 @@ type ResumeDisorderResult struct {
type ResumeProcessResult struct {
UploadID string
Path string
Order bool
NextPartSize int64
NextPartID int64
Parts []*DisorderPart
Expand All @@ -965,6 +1002,11 @@ func (up *UpYun) GetResumeProcess(path string) (*ResumeProcessResult, error) {
partSizeStr := resp.Header.Get("X-Upyun-Next-Part-Size")
partIDStr := resp.Header.Get("X-Upyun-Next-Part-Id")
uploadID := resp.Header.Get("X-Upyun-Multi-Uuid")
order := resp.Header.Get("X-Upyun-Meta-Order")
o := true
if order == "false" {
o = false
}

if partSizeStr != "" {
partSize, err = strconv.ParseInt(partSizeStr, 10, 64)
Expand Down Expand Up @@ -995,6 +1037,7 @@ func (up *UpYun) GetResumeProcess(path string) (*ResumeProcessResult, error) {
NextPartSize: partSize,
NextPartID: partID,
Path: path,
Order: o,
Parts: disorderRes.Parts,
}, nil
}

0 comments on commit 9fa7c61

Please sign in to comment.