Skip to content

Commit

Permalink
feat(alias): add DownloadConcurrency and DownloadPartSize option (#…
Browse files Browse the repository at this point in the history
…7829)

* fix(net): goroutine logic bug (#7215)

* Fix goroutine logic bug

* Fix bug

---------

Co-authored-by: hpy hs <[email protected]>

* perf(net): sequential and dynamic concurrency

* fix(net): incorrect error return

* feat(alias):  add `DownloadConcurrency` and `DownloadPartSize` option

* feat(net): add `ConcurrencyLimit`

* pref(net): create `chunk` on demand

* refactor

* refactor

* fix(net): `r.Closers.Add` has no effect

* refactor

---------

Co-authored-by: hpy hs <[email protected]>
  • Loading branch information
j2rong4cn and hshpy authored Jan 27, 2025
1 parent bdcf450 commit 2be0c3d
Show file tree
Hide file tree
Showing 24 changed files with 396 additions and 238 deletions.
10 changes: 10 additions & 0 deletions drivers/alias/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ func (d *Alias) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
for _, dst := range dsts {
link, err := d.link(ctx, dst, sub, args)
if err == nil {
if !args.Redirect && len(link.URL) > 0 {
// 正常情况下 多并发 仅支持返回URL的驱动
// alias套娃alias 可以让crypt、mega等驱动(不返回URL的) 支持并发
if d.DownloadConcurrency > 0 {
link.Concurrency = d.DownloadConcurrency
}
if d.DownloadPartSize > 0 {
link.PartSize = d.DownloadPartSize * utils.KB
}
}
return link, nil
}
}
Expand Down
6 changes: 4 additions & 2 deletions drivers/alias/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ type Addition struct {
// Usually one of two
// driver.RootPath
// define other
Paths string `json:"paths" required:"true" type:"text"`
ProtectSameName bool `json:"protect_same_name" default:"true" required:"false" help:"Protects same-name files from Delete or Rename"`
Paths string `json:"paths" required:"true" type:"text"`
ProtectSameName bool `json:"protect_same_name" default:"true" required:"false" help:"Protects same-name files from Delete or Rename"`
DownloadConcurrency int `json:"download_concurrency" default:"0" required:"false" type:"number" help:"Need to enable proxy"`
DownloadPartSize int `json:"download_part_size" default:"0" type:"number" required:"false" help:"Need to enable proxy. Unit: KB"`
}

var config = driver.Config{
Expand Down
10 changes: 8 additions & 2 deletions drivers/alias/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/sign"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/alist-org/alist/v3/server/common"
Expand Down Expand Up @@ -94,10 +95,15 @@ func (d *Alias) list(ctx context.Context, dst, sub string, args *fs.ListArgs) ([

func (d *Alias) link(ctx context.Context, dst, sub string, args model.LinkArgs) (*model.Link, error) {
reqPath := stdpath.Join(dst, sub)
storage, err := fs.GetStorage(reqPath, &fs.GetStoragesArgs{})
// 参考 crypt 驱动
storage, reqActualPath, err := op.GetStorageAndActualPath(reqPath)
if err != nil {
return nil, err
}
if _, ok := storage.(*Alias); !ok && !args.Redirect {
link, _, err := op.Link(ctx, storage, reqActualPath, args)
return link, err
}
_, err = fs.Get(ctx, reqPath, &fs.GetArgs{NoLog: true})
if err != nil {
return nil, err
Expand All @@ -114,7 +120,7 @@ func (d *Alias) link(ctx context.Context, dst, sub string, args model.LinkArgs)
}
return link, nil
}
link, _, err := fs.Link(ctx, reqPath, args)
link, _, err := op.Link(ctx, storage, reqActualPath, args)
return link, err
}

Expand Down
7 changes: 2 additions & 5 deletions drivers/crypt/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ func (d *Crypt) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
rrc = converted
}
if rrc != nil {
//remoteRangeReader, err :=
remoteReader, err := rrc.RangeRead(ctx, http_range.Range{Start: underlyingOffset, Length: length})
remoteClosers.AddClosers(rrc.GetClosers())
if err != nil {
Expand All @@ -288,10 +287,8 @@ func (d *Crypt) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
if err != nil {
return nil, err
}
//remoteClosers.Add(remoteLink.MFile)
//keep reuse same MFile and close at last.
remoteClosers.Add(remoteLink.MFile)
return io.NopCloser(remoteLink.MFile), nil
// 可以直接返回,读取完也不会调用Close,直到连接断开Close
return remoteLink.MFile, nil
}

return nil, errs.NotSupport
Expand Down
15 changes: 8 additions & 7 deletions drivers/github/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
stdpath "path"
"strings"
"sync"
"text/template"

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/go-resty/resty/v2"
log "github.com/sirupsen/logrus"
"io"
"net/http"
stdpath "path"
"strings"
"sync"
"text/template"
)

type Github struct {
Expand Down Expand Up @@ -656,7 +657,7 @@ func (d *Github) putBlob(ctx context.Context, stream model.FileStreamer, up driv
contentReader, contentWriter := io.Pipe()
go func() {
encoder := base64.NewEncoder(base64.StdEncoding, contentWriter)
if _, err := io.Copy(encoder, stream); err != nil {
if _, err := utils.CopyWithBuffer(encoder, stream); err != nil {
_ = contentWriter.CloseWithError(err)
return
}
Expand Down
16 changes: 7 additions & 9 deletions drivers/halalcloud/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import (
"context"
"crypto/sha1"
"fmt"
"io"
"net/url"
"path"
"strconv"
"time"

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/http_range"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -19,11 +24,6 @@ import (
pubUserFile "github.com/city404/v6-public-rpc-proto/go/v6/userfile"
"github.com/rclone/rclone/lib/readers"
"github.com/zzzhr1990/go-common-entity/userfile"
"io"
"net/url"
"path"
"strconv"
"time"
)

type HalalCloud struct {
Expand Down Expand Up @@ -251,7 +251,6 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin

size := result.FileSize
chunks := getChunkSizes(result.Sizes)
var finalClosers utils.Closers
resultRangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
length := httpRange.Length
if httpRange.Length >= 0 && httpRange.Start+httpRange.Length >= size {
Expand All @@ -269,7 +268,6 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin
sha: result.Sha1,
shaTemp: sha1.New(),
}
finalClosers.Add(oo)

return readers.NewLimitedReadCloser(oo, length), nil
}
Expand All @@ -281,7 +279,7 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin
duration = time.Until(time.Now().Add(time.Hour))
}

resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader, Closers: finalClosers}
resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader}
return &model.Link{
RangeReadCloser: resultRangeReadCloser,
Expiration: &duration,
Expand Down
4 changes: 1 addition & 3 deletions drivers/mega/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (d *Mega) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*
//}

size := file.GetSize()
var finalClosers utils.Closers
resultRangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
length := httpRange.Length
if httpRange.Length >= 0 && httpRange.Start+httpRange.Length >= size {
Expand All @@ -103,11 +102,10 @@ func (d *Mega) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*
d: down,
skip: httpRange.Start,
}
finalClosers.Add(oo)

return readers.NewLimitedReadCloser(oo, length), nil
}
resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader, Closers: finalClosers}
resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader}
resultLink := &model.Link{
RangeReadCloser: resultRangeReadCloser,
}
Expand Down
1 change: 0 additions & 1 deletion drivers/netease_music/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (lrc *LyricObj) getLyricLink() *model.Link {
sr := io.NewSectionReader(reader, httpRange.Start, httpRange.Length)
return io.NopCloser(sr), nil
},
Closers: utils.EmptyClosers(),
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion drivers/netease_music/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (u *uploader) init(stream model.FileStreamer) error {
}

h := md5.New()
io.Copy(h, stream)
utils.CopyWithBuffer(h, stream)
u.md5 = hex.EncodeToString(h.Sum(nil))
_, err := u.file.Seek(0, io.SeekStart)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions drivers/quqi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ func (d *Quqi) linkFromCDN(id string) (*model.Link, error) {
bufferReader := bufio.NewReader(decryptReader)
bufferReader.Discard(int(decryptedOffset))

return utils.NewReadCloser(bufferReader, func() error {
return nil
}), nil
return io.NopCloser(bufferReader), nil
}

return &model.Link{
Expand Down
4 changes: 4 additions & 0 deletions internal/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/alist-org/alist/v3/cmd/flags"
"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/net"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/caarlos0/env/v9"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -63,6 +64,9 @@ func InitConfig() {
log.Fatalf("update config struct error: %+v", err)
}
}
if conf.Conf.MaxConcurrency > 0 {
net.DefaultConcurrencyLimit = &net.ConcurrencyLimit{Limit: conf.Conf.MaxConcurrency}
}
if !conf.Conf.Force {
confFromEnv()
}
Expand Down
2 changes: 2 additions & 0 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type Config struct {
Log LogConfig `json:"log"`
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
MaxConcurrency int `json:"max_concurrency" env:"MAX_CONCURRENCY"`
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
Tasks TasksConfig `json:"tasks" envPrefix:"TASKS_"`
Cors Cors `json:"cors" envPrefix:"CORS_"`
Expand Down Expand Up @@ -151,6 +152,7 @@ func DefaultConfig() *Config {
MaxAge: 28,
},
MaxConnections: 0,
MaxConcurrency: 64,
TlsInsecureSkipVerify: true,
Tasks: TasksConfig{
Download: TaskConfig{
Expand Down
11 changes: 6 additions & 5 deletions internal/model/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type ListArgs struct {
}

type LinkArgs struct {
IP string
Header http.Header
Type string
HttpReq *http.Request
IP string
Header http.Header
Type string
HttpReq *http.Request
Redirect bool
}

type Link struct {
Expand Down Expand Up @@ -87,7 +88,7 @@ type RangeReadCloser struct {
utils.Closers
}

func (r RangeReadCloser) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
func (r *RangeReadCloser) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
rc, err := r.RangeReader(ctx, httpRange)
r.Closers.Add(rc)
return rc, err
Expand Down
Loading

0 comments on commit 2be0c3d

Please sign in to comment.