From 17cf646be3e1d103da735fb8c688100c5f360cbf Mon Sep 17 00:00:00 2001 From: Libo Huang Date: Mon, 28 Oct 2024 14:21:47 +0800 Subject: [PATCH] TODO --- upyun/io.go | 105 ++++++++++++++++++++++---------------------------- upyun/rest.go | 82 +++++++++++++++++++++++++-------------- 2 files changed, 98 insertions(+), 89 deletions(-) diff --git a/upyun/io.go b/upyun/io.go index b0b865f..612016a 100644 --- a/upyun/io.go +++ b/upyun/io.go @@ -1,78 +1,63 @@ package upyun import ( + "bytes" + "crypto/md5" "fmt" "io" - "os" ) type UpYunPutReader interface { Len() (n int) MD5() (ret string) Read([]byte) (n int, err error) - Copyed() (n int) } -type fragmentFile struct { - realFile *os.File - offset int64 - limit int64 - cursor int64 +type Chunk struct { + buf io.Reader + buf2 *bytes.Buffer + id int + n int } -func (f *fragmentFile) Seek(offset int64, whence int) (ret int64, err error) { - switch whence { - case 0: - f.cursor = offset - ret, err = f.realFile.Seek(f.offset+f.cursor, 0) - return ret - f.offset, err - default: - return 0, fmt.Errorf("whence must be 0") +func (c *Chunk) Read(b []byte) (n int, err error) { + if c.buf2 != nil { + return c.buf2.Read(b) } -} - -func (f *fragmentFile) Read(b []byte) (n int, err error) { - if f.cursor >= f.limit { - return 0, io.EOF - } - n, err = f.realFile.Read(b) - if f.cursor+int64(n) > f.limit { - n = int(f.limit - f.cursor) - } - f.cursor += int64(n) - return n, err -} - -func (f *fragmentFile) Stat() (fInfo os.FileInfo, err error) { - return fInfo, fmt.Errorf("fragmentFile not implement Stat()") -} - -func (f *fragmentFile) Close() error { - return nil -} - -func (f *fragmentFile) Copyed() int { - return int(f.cursor - f.offset) -} - -func (f *fragmentFile) Len() int { - return int(f.limit - f.offset) -} - -func (f *fragmentFile) MD5() string { - s, _ := md5File(f) - return s -} - -func newFragmentFile(file *os.File, offset, limit int64) (*fragmentFile, error) { - f := &fragmentFile{ - realFile: file, - offset: offset, - limit: limit, - } - - if _, err := f.Seek(0, 0); err != nil { - return nil, err + return c.buf.Read(b) +} + +func (c *Chunk) Len() int { + return c.n +} +func (c *Chunk) ID() int { + return c.id +} + +func (c *Chunk) MD5() string { + c.buf2 = bytes.NewBuffer(nil) + reader := io.TeeReader(c.buf, c.buf2) + hash := md5.New() + _, _ = io.Copy(hash, reader) + return fmt.Sprintf("%x", hash.Sum(nil)) +} + +func GetReadChunk(input io.Reader, size, partSize int64, ch chan *Chunk) { + id := 0 + bytesLeft := size + for bytesLeft > 0 { + n := partSize + if bytesLeft <= partSize { + n = bytesLeft + } + reader := io.LimitReader(input, n) + ch <- &Chunk{ + buf: reader, + id: id, + n: int(n), + } + id++ + bytesLeft -= n } - return f, nil + close(ch) } diff --git a/upyun/rest.go b/upyun/rest.go index fd92878..d374f91 100644 --- a/upyun/rest.go +++ b/upyun/rest.go @@ -79,6 +79,8 @@ type GetRequestConfig struct { Headers map[string]string } +type ProxyReader func(offset int64, r io.Reader) io.ReadCloser + // PutObjectConfig provides a configuration to Put method. type PutObjectConfig struct { Path string @@ -91,6 +93,7 @@ type PutObjectConfig struct { // AppendContent bool ResumePartSize int64 MaxResumePutTries int + ProxyReader ProxyReader } type MoveObjectConfig struct { @@ -278,6 +281,34 @@ func getPartInfo(partSize, fsize int64) (int64, int64, error) { return partSize, partNum, nil } +func (up *UpYun) getMultipartUploadProcess(config *PutObjectConfig, fsize int64) (*ResumeProcessResult, error) { + resumeProcessResult, _ := up.GetResumeProcess(config.Path) + if resumeProcessResult != nil { + if resumeProcessResult.Order { + return resumeProcessResult, nil + } + } + + initMultipartUploadConfig := &InitMultipartUploadConfig{ + Path: config.Path, + ContentLength: fsize, + PartSize: config.ResumePartSize, + ContentType: config.Headers["Content-Type"], + OrderUpload: true, + } + initMultipartUploadResult, err := up.InitMultipartUpload(initMultipartUploadConfig) + if err != nil { + return nil, err + } + return &ResumeProcessResult{ + UploadID: initMultipartUploadResult.UploadID, + Path: initMultipartUploadConfig.Path, + NextPartID: 0, + NextPartSize: config.ResumePartSize, + Parts: make([]*DisorderPart, 0), + }, nil +} + func (up *UpYun) resumePut(config *PutObjectConfig) error { f, ok := config.Reader.(*os.File) if !ok { @@ -301,7 +332,6 @@ func (up *UpYun) resumePut(config *PutObjectConfig) error { if config.Headers == nil { config.Headers = make(map[string]string) } - headers := config.Headers var breakpoint *BreakPointConfig if up.Recorder != nil { @@ -310,23 +340,16 @@ func (up *UpYun) resumePut(config *PutObjectConfig) error { // first upload or file has expired maxPartID := int((fsize+config.ResumePartSize-1)/config.ResumePartSize - 1) - var uploadInfo *InitMultipartUploadResult if breakpoint == nil || isRecordExpired(fileinfo, breakpoint) { - uploadInfo, err = up.InitMultipartUpload(&InitMultipartUploadConfig{ - Path: config.Path, - PartSize: config.ResumePartSize, - ContentType: headers["Content-Type"], - ContentLength: fsize, - OrderUpload: true, - }) + uploadProcess, err := up.getMultipartUploadProcess(config, fsize) if err != nil { return err } breakpoint = &BreakPointConfig{ - UploadID: uploadInfo.UploadID, - PartSize: uploadInfo.PartSize, - PartID: 0, + UploadID: uploadProcess.UploadID, + PartSize: uploadProcess.NextPartSize, + PartID: int(uploadProcess.NextPartID), } if up.Recorder != nil { @@ -876,21 +899,15 @@ type BreakPointConfig struct { LastTime time.Time } -func (up *UpYun) resumeUploadPart(config *PutObjectConfig, breakpoint *BreakPointConfig, f *os.File, fileInfo fs.FileInfo) (*BreakPointConfig, error) { +func (up *UpYun) resumeUploadPart(config *PutObjectConfig, breakpoint *BreakPointConfig, f io.Reader, fileInfo fs.FileInfo) (*BreakPointConfig, error) { fsize := fileInfo.Size() - maxPartID := int((fsize+config.ResumePartSize-1)/config.ResumePartSize - 1) partID := breakpoint.PartID curSize, partSize := int64(partID)*breakpoint.PartSize, breakpoint.PartSize - - for id := partID; id <= maxPartID; id++ { - if curSize+partSize > fsize { - partSize = fsize - curSize - } - fragFile, err := newFragmentFile(f, curSize, partSize) - if err != nil { - return breakpoint, errorOperation("new fragment file", err) - } - + bytesLeft := fsize - curSize + ch := make(chan *Chunk, 1) + var err error + go GetReadChunk(config.ProxyReader(curSize, f), bytesLeft, partSize, ch) + for chunk := range ch { try := 0 for ; config.MaxResumePutTries == 0 || try < config.MaxResumePutTries; try++ { err = up.UploadPart( @@ -900,9 +917,9 @@ func (up *UpYun) resumeUploadPart(config *PutObjectConfig, breakpoint *BreakPoin PartSize: breakpoint.PartSize, }, &UploadPartConfig{ - PartID: id, - PartSize: partSize, - Reader: fragFile, + PartID: chunk.ID(), + PartSize: int64(chunk.Len()), + Reader: chunk, }) if err == nil { break @@ -910,7 +927,7 @@ func (up *UpYun) resumeUploadPart(config *PutObjectConfig, breakpoint *BreakPoin } if config.MaxResumePutTries > 0 && try == config.MaxResumePutTries { - breakpoint.PartID = id + breakpoint.PartID = chunk.ID() breakpoint.FileSize = fsize breakpoint.LastTime = time.Now() breakpoint.FileModTime = fileInfo.ModTime() @@ -921,7 +938,7 @@ func (up *UpYun) resumeUploadPart(config *PutObjectConfig, breakpoint *BreakPoin return breakpoint, err } curSize += partSize - breakpoint.PartID = id + 1 + breakpoint.PartID = chunk.ID() + 1 } return breakpoint, nil } @@ -940,6 +957,7 @@ type ResumeDisorderResult struct { type ResumeProcessResult struct { UploadID string Path string + Order bool NextPartSize int64 NextPartID int64 Parts []*DisorderPart @@ -965,6 +983,11 @@ func (up *UpYun) GetResumeProcess(path string) (*ResumeProcessResult, error) { partSizeStr := resp.Header.Get("X-Upyun-Next-Part-Size") partIDStr := resp.Header.Get("X-Upyun-Next-Part-Id") uploadID := resp.Header.Get("X-Upyun-Multi-Uuid") + order := resp.Header.Get("X-Upyun-Meta-Order") + o := true + if order == "false" { + o = false + } if partSizeStr != "" { partSize, err = strconv.ParseInt(partSizeStr, 10, 64) @@ -995,6 +1018,7 @@ func (up *UpYun) GetResumeProcess(path string) (*ResumeProcessResult, error) { NextPartSize: partSize, NextPartID: partID, Path: path, + Order: o, Parts: disorderRes.Parts, }, nil }