Skip to content

Commit

Permalink
rm S3ND_UPLOAD_BWLIMIT_INTERNAL & github.com/conduitio/bwlimit
Browse files Browse the repository at this point in the history
github.com/conduitio/bwlimit's approach to trying to time writes to the
socket proved to be fairly inaccurate in terms of actual data rate and
seems inferior to SO_MAX_PACING_RATE.
  • Loading branch information
jhoblitt committed Nov 22, 2024
1 parent 09ee251 commit 06e0b70
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 36 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.28.3
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.37
github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3
github.com/conduitio/bwlimit v0.1.0
github.com/hyperledger/fabric v2.1.1+incompatible
golang.org/x/sys v0.26.0
k8s.io/apimachinery v0.31.2
Expand All @@ -33,6 +32,5 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/onsi/gomega v1.35.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/time v0.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
)
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGI
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/conduitio/bwlimit v0.1.0 h1:x3ijON0TSghQob4tFKaEvKixFmYKfVJQeSpXluC2JvE=
github.com/conduitio/bwlimit v0.1.0/go.mod h1:E+ASZ1/5L33MTb8hJTERs5Xnmh6Ulq3jbRh7LrdbXWU=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
Expand Down Expand Up @@ -87,8 +85,6 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand Down
42 changes: 12 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/conduitio/bwlimit"
"github.com/hyperledger/fabric/common/semaphore"
"golang.org/x/sys/unix"
k8sresource "k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -38,7 +37,6 @@ type S3ndConf struct {
uploadTries *int
uploadPartsize *k8sresource.Quantity
uploadBwlimit *k8sresource.Quantity
uploadBwlimitInteral bool
uploadWriteBufferSize *k8sresource.Quantity
}

Expand Down Expand Up @@ -222,9 +220,6 @@ func getConf() S3ndConf {
}
uploadBwlimitRaw := flag.String("upload-bwlimit", defaultUploadBwlimit, "Upload bandwidth limit in bits per second (S3ND_UPLOAD_BWLIMIT)")

defaultUploadBwlimitInternal, _ := strconv.ParseBool(os.Getenv("S3ND_UPLOAD_BWLIMIT_INTERNAL"))
uploadBwlimitInternal := flag.Bool("upload-bwlimit-internal", defaultUploadBwlimitInternal, "Use internal tcp pacing instead of fq (S3ND_UPLOAD_BWLIMIT_INTERNAL)")

defaultUploadWriteBufferSize := os.Getenv("S3ND_UPLOAD_WRITE_BUFFER_SIZE")
if defaultUploadWriteBufferSize == "" {
defaultUploadWriteBufferSize = "64Ki"
Expand Down Expand Up @@ -262,8 +257,6 @@ func getConf() S3ndConf {
}
conf.uploadBwlimit = &uploadBwlimit

conf.uploadBwlimitInteral = *uploadBwlimitInternal

uploadWriteBufferSize, err := k8sresource.ParseQuantity(*uploadWriteBufferSizeRaw)
if err != nil {
log.Fatal("S3ND_UPLOAD_WRITE_BUFFER_SIZE is invalid")
Expand All @@ -279,7 +272,6 @@ func getConf() S3ndConf {
log.Println("S3ND_UPLOAD_TRIES:", *conf.uploadTries)
log.Println("S3ND_UPLOAD_PARTSIZE:", conf.uploadPartsize.String())
log.Println("S3ND_UPLOAD_BWLIMIT:", conf.uploadBwlimit.String())
log.Println("S3ND_UPLOAD_BWLIMIT_INTERNAL:", conf.uploadBwlimitInteral)
log.Println("S3ND_UPLOAD_WRITE_BUFFER_SIZE:", conf.uploadWriteBufferSize.String())

return conf
Expand All @@ -295,27 +287,17 @@ func NewHandler(conf *S3ndConf) *S3ndHandler {
var httpClient *awshttp.BuildableClient

if conf.uploadBwlimit.Value() != 0 {
var dialCtx func(ctx context.Context, network, address string) (net.Conn, error)

if conf.uploadBwlimitInteral {
dialCtx = bwlimit.NewDialer(&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 0,
}, bwlimit.Byte(conf.uploadBwlimit.Value()/8), 0).DialContext
} else {
dialer := &net.Dialer{
Control: func(network, address string, conn syscall.RawConn) error {
// https://pkg.go.dev/syscall#RawConn
var operr error
if err := conn.Control(func(fd uintptr) {
operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_MAX_PACING_RATE, int(conf.uploadBwlimit.Value()/8))
}); err != nil {
return err
}
return operr
},
}
dialCtx = dialer.DialContext
dialer := &net.Dialer{
Control: func(network, address string, conn syscall.RawConn) error {
// https://pkg.go.dev/syscall#RawConn
var operr error
if err := conn.Control(func(fd uintptr) {
operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_MAX_PACING_RATE, int(conf.uploadBwlimit.Value()/8))
}); err != nil {
return err
}
return operr
},
}

httpClient = awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) {
Expand All @@ -328,7 +310,7 @@ func NewHandler(conf *S3ndConf) *S3ndHandler {
// disable http/2 to prevent muxing over a single tcp connection
t.ForceAttemptHTTP2 = false
t.TLSClientConfig.NextProtos = []string{"http/1.1"}
t.DialContext = dialCtx
t.DialContext = dialer.DialContext
})
} else {
httpClient = awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) {
Expand Down

0 comments on commit 06e0b70

Please sign in to comment.