diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..3694010 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,138 @@ +--- +name: Build and Publish + +"on": + push: + branches: + - "**" + tags: + - "v*.*.*" + pull_request: + branches: + - main + - master + workflow_dispatch: + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: "1.22" + + - name: Build + run: go build -v ./... + + - name: Vet + run: go vet -v ./... + + - name: Test + run: go test -v ./... + + hadolint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: hadolint/hadolint-action@v3.1.0 + with: + dockerfile: Dockerfile + + yamllint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Run yamllint + uses: bewuethr/yamllint-action@v1 + + shellcheck: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Run ShellCheck + uses: ludeeus/action-shellcheck@master + + oci_image: + name: Build OCI Image + needs: build + if: github.repository == 'lsst-dm/s3nd' + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Docker meta + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=schedule + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}} + type=sha + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + + gh-release: + name: Create GitHub Release + needs: build + if: startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Release + uses: softprops/action-gh-release@v2 + + go-release: + name: Release Go Binaries + needs: gh-release + if: startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-latest + strategy: + matrix: + goos: [linux, darwin] + goarch: [amd64, arm64] + steps: + - uses: actions/checkout@v4 + - uses: wangyoucao577/go-release-action@v1 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + goos: ${{ matrix.goos }} + goarch: ${{ matrix.goarch }} + goversion: "1.22" + asset_name: '${{ github.event.repository.name }}-${{ matrix.goos }}-${{ matrix.goarch }}' diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..08e7509 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/s3nd diff --git a/.hadolint.yaml b/.hadolint.yaml new file mode 100644 index 0000000..430835c --- /dev/null +++ b/.hadolint.yaml @@ -0,0 +1,4 @@ +--- +ignored: + # disable pinning apk package versions + - DL3018 diff --git a/.yamllint.yaml b/.yamllint.yaml new file mode 100644 index 0000000..8074ccb --- /dev/null +++ b/.yamllint.yaml @@ -0,0 +1,17 @@ +--- +extends: "default" + +rules: + # 80 chars should be enough, but don't fail if a line is longer + line-length: false + indentation: + level: "error" + spaces: 2 + indent-sequences: true + # do not obsess over comment formatting + comments-indentation: false + comments: + level: "error" + require-starting-space: false + document-start: + level: "error" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..33ba332 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.22-alpine AS builder + +RUN apk --update --no-cache add \ + binutils \ + && rm -rf /root/.cache +WORKDIR /go/src/github.com/lsst-dm/s3nd +COPY . . +RUN CGO_ENABLED=0 go build -ldflags "-extldflags '-static'" -o s3nd && strip s3nd + +FROM alpine:3 +WORKDIR /root/ +COPY --from=builder /go/src/github.com/lsst-dm/s3nd/s3nd /bin/s3nd +ENTRYPOINT ["/bin/s3nd"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1692a9d --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +all: + CGO_ENABLED=0 go build -ldflags "-extldflags '-static'" -o s3nd diff --git a/conf/conf.go b/conf/conf.go new file mode 100644 index 0000000..113c988 --- /dev/null +++ b/conf/conf.go @@ -0,0 +1,134 @@ +package conf + +import ( + "flag" + "log" + "os" + "strconv" + "time" + + k8sresource "k8s.io/apimachinery/pkg/api/resource" +) + +type S3ndConf struct { + Host *string + Port *int + EndpointUrl *string + UploadMaxParallel *int64 + UploadTimeout *time.Duration + QueueTimeout *time.Duration + UploadTries *int + UploadPartsize *k8sresource.Quantity + UploadBwlimit *k8sresource.Quantity + UploadWriteBufferSize *k8sresource.Quantity +} + +// Parse the environment variables and flags. If a flag is not set, the +// environment variable is used. Errors are fatal. +func NewConf() S3ndConf { + conf := S3ndConf{} + + // start flags + conf.Host = flag.String("host", os.Getenv("S3ND_HOST"), "S3 Daemon Host (S3ND_HOST)") + + defaultPort, _ := strconv.Atoi(os.Getenv("S3ND_PORT")) + if defaultPort == 0 { + defaultPort = 15555 + } + conf.Port = flag.Int("port", defaultPort, "S3 Daemon Port (S3ND_PORT)") + + conf.EndpointUrl = flag.String("endpoint-url", os.Getenv("S3ND_ENDPOINT_URL"), "S3 Endpoint URL (S3ND_ENDPOINT_URL)") + + var defaultUploadMaxParallel int64 + defaultUploadMaxParallel, _ = strconv.ParseInt(os.Getenv("S3ND_UPLOAD_MAX_PARALLEL"), 10, 64) + if defaultUploadMaxParallel == 0 { + defaultUploadMaxParallel = 100 + } + conf.UploadMaxParallel = flag.Int64("upload-max-parallel", defaultUploadMaxParallel, "Maximum number of parallel object uploads (S3ND_UPLOAD_MAX_PARALLEL)") + + defaultUploadTimeout := os.Getenv("S3ND_UPLOAD_TIMEOUT") + if defaultUploadTimeout == "" { + defaultUploadTimeout = "10s" + } + uploadTimeout := flag.String("upload-timeout", defaultUploadTimeout, "Upload Timeout (S3ND_UPLOAD_TIMEOUT)") + + defaultQueueTimeout := os.Getenv("S3ND_QUEUE_TIMEOUT") + if defaultQueueTimeout == "" { + defaultQueueTimeout = "10s" + } + queueTimeout := flag.String("queue-timeout", defaultQueueTimeout, "Queue Timeout waiting for transfer to start (S3ND_QUEUE_TIMEOUT)") + + defaultUploadTries, _ := strconv.Atoi(os.Getenv("S3ND_UPLOAD_TRIES")) + if defaultUploadTries == 0 { + defaultUploadTries = 1 + } + conf.UploadTries = flag.Int("upload-tries", defaultUploadTries, "Max number of upload tries (S3ND_UPLOAD_TRIES)") + + defaultUploadPartsize := os.Getenv("S3ND_UPLOAD_PARTSIZE") + if defaultUploadPartsize == "" { + defaultUploadPartsize = "5Mi" + } + uploadPartsizeRaw := flag.String("upload-partsize", defaultUploadPartsize, "Upload Part Size (S3ND_UPLOAD_PARTSIZE)") + + defaultUploadBwlimit := os.Getenv("S3ND_UPLOAD_BWLIMIT") + if defaultUploadBwlimit == "" { + defaultUploadBwlimit = "0" + } + uploadBwlimitRaw := flag.String("upload-bwlimit", defaultUploadBwlimit, "Upload bandwidth limit in bits per second (S3ND_UPLOAD_BWLIMIT)") + + defaultUploadWriteBufferSize := os.Getenv("S3ND_UPLOAD_WRITE_BUFFER_SIZE") + if defaultUploadWriteBufferSize == "" { + defaultUploadWriteBufferSize = "64Ki" + } + uploadWriteBufferSizeRaw := flag.String("upload-write-buffer-size", defaultUploadWriteBufferSize, "Upload Write Buffer Size (S3ND_UPLOAD_WRITE_BUFFER_SIZE)") + + flag.Parse() + // end flags + + if *conf.EndpointUrl == "" { + log.Fatal("S3ND_ENDPOINT_URL is required") + } + + uploadTimeoutDuration, err := time.ParseDuration(*uploadTimeout) + if err != nil { + log.Fatal("S3ND_UPLOAD_TIMEOUT is invalid") + } + conf.UploadTimeout = &uploadTimeoutDuration + + queueTimeoutDuration, err := time.ParseDuration(*queueTimeout) + if err != nil { + log.Fatal("S3ND_QUEUE_TIMEOUT is invalid") + } + conf.QueueTimeout = &queueTimeoutDuration + + uploadPartsize, err := k8sresource.ParseQuantity(*uploadPartsizeRaw) + if err != nil { + log.Fatal("S3ND_UPLOAD_PARTSIZE is invalid") + } + conf.UploadPartsize = &uploadPartsize + + uploadBwlimit, err := k8sresource.ParseQuantity(*uploadBwlimitRaw) + if err != nil { + log.Fatal("S3ND_UPLOAD_BWLIMIT is invalid") + } + conf.UploadBwlimit = &uploadBwlimit + + uploadWriteBufferSize, err := k8sresource.ParseQuantity(*uploadWriteBufferSizeRaw) + if err != nil { + log.Fatal("S3ND_UPLOAD_WRITE_BUFFER_SIZE is invalid") + } + conf.UploadWriteBufferSize = &uploadWriteBufferSize + + log.Println("S3ND_HOST:", *conf.Host) + log.Println("S3ND_PORT:", *conf.Port) + log.Println("S3ND_ENDPOINT_URL:", *conf.EndpointUrl) + log.Println("S3ND_UPLOAD_MAX_PARALLEL:", *conf.UploadMaxParallel) + log.Println("S3ND_UPLOAD_TIMEOUT:", *conf.UploadTimeout) + log.Println("S3ND_QUEUE_TIMEOUT:", *conf.QueueTimeout) + log.Println("S3ND_UPLOAD_TRIES:", *conf.UploadTries) + log.Println("S3ND_UPLOAD_PARTSIZE:", conf.UploadPartsize.String()) + log.Println("S3ND_UPLOAD_BWLIMIT:", conf.UploadBwlimit.String()) + log.Println("S3ND_UPLOAD_WRITE_BUFFER_SIZE:", conf.UploadWriteBufferSize.String()) + + return conf +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7186fe5 --- /dev/null +++ b/go.mod @@ -0,0 +1,36 @@ +module github.com/lsst-dm/s3nd + +go 1.22.7 + +require ( + github.com/aws/aws-sdk-go-v2 v1.32.4 + github.com/aws/aws-sdk-go-v2/config v1.28.3 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.37 + github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3 + github.com/hyperledger/fabric v2.1.1+incompatible + golang.org/x/sys v0.26.0 + k8s.io/apimachinery v0.31.2 +) + +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.44 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.23 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.5 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect + github.com/aws/smithy-go v1.22.0 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/onsi/gomega v1.35.1 // indirect + github.com/x448/float16 v0.8.4 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..babecfa --- /dev/null +++ b/go.sum @@ -0,0 +1,101 @@ +github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkOxNE= +github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 h1:pT3hpW0cOHRJx8Y0DfJUEQuqPild8jRGmSFmBgvydr0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6/go.mod h1:j/I2++U0xX+cr44QjHay4Cvxj6FUbnxrgmqN3H1jTZA= +github.com/aws/aws-sdk-go-v2/config v1.28.3 h1:kL5uAptPcPKaJ4q0sDUjUIdueO18Q7JDzl64GpVwdOM= +github.com/aws/aws-sdk-go-v2/config v1.28.3/go.mod h1:SPEn1KA8YbgQnwiJ/OISU4fz7+F6Fe309Jf0QTsRCl4= +github.com/aws/aws-sdk-go-v2/credentials v1.17.44 h1:qqfs5kulLUHUEXlHEZXLJkgGoF3kkUeFUTVA585cFpU= +github.com/aws/aws-sdk-go-v2/credentials v1.17.44/go.mod h1:0Lm2YJ8etJdEdw23s+q/9wTpOeo2HhNE97XcRa7T8MA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 h1:woXadbf0c7enQ2UGCi8gW/WuKmE0xIzxBF/eD94jMKQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19/go.mod h1:zminj5ucw7w0r65bP6nhyOd3xL6veAUMc3ElGMoLVb4= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.37 h1:jHKR76E81sZvz1+x1vYYrHMxphG5LFBJPhSqEr4CLlE= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.37/go.mod h1:iMkyPkmoJWQKzSOtaX+8oEJxAuqr7s8laxcqGDSHeII= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23/go.mod h1:35EVp9wyeANdujZruvHiQUAo9E3vbhnIO1mTCAxMlY0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 h1:pgYW9FCabt2M25MoHYCfMrVY2ghiiBKYWUVXfwZs+sU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23/go.mod h1:c48kLgzO19wAu3CPkDWC28JbaJ+hfQlsdl7I2+oqIbk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.23 h1:1SZBDiRzzs3sNhOMVApyWPduWYGAX0imGy06XiBnCAM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.23/go.mod h1:i9TkxgbZmHVh2S0La6CAXtnyFhlCX/pJ0JsOvBAS6Mk= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.4 h1:aaPpoG15S2qHkWm4KlEyF01zovK1nW4BBbyXuHNSE90= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.4/go.mod h1:eD9gS2EARTKgGr/W5xwgY/ik9z/zqpW+m/xOQbVxrMk= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 h1:tHxQi/XHPK0ctd/wdOw0t7Xrc2OxcRCnVzv8lwWPu0c= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4/go.mod h1:4GQbF1vJzG60poZqWatZlhP31y8PGCCVTvIGPdaaYJ0= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.4 h1:E5ZAVOmI2apR8ADb72Q63KqwwwdW1XcMeXIlrZ1Psjg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.4/go.mod h1:wezzqVUOVVdk+2Z/JzQT4NxAU0NbhRe5W8pIE72jsWI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3 h1:neNOYJl72bHrz9ikAEED4VqWyND/Po0DnEx64RW6YM4= +github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3/go.mod h1:TMhLIyRIyoGVlaEMAt+ITMbwskSTpcGsCPDq91/ihY0= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.5 h1:HJwZwRt2Z2Tdec+m+fPjvdmkq2s9Ra+VR0hjF7V2o40= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.5/go.mod h1:wrMCEwjFPms+V86TCQQeOxQF/If4vT44FGIOFiMC2ck= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 h1:zcx9LiGWZ6i6pjdcoE9oXAB6mUdeyC36Ia/QEiIvYdg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4/go.mod h1:Tp/ly1cTjRLGBBmNccFumbZ8oqpZlpdhFf80SrRh4is= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGIxF+on3KOISbK5IU= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8= +github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= +github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= +github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/hyperledger/fabric v2.1.1+incompatible h1:cYYRv3vVg4kA6DmrixLxwn1nwBEUuYda8DsMwlaMKbY= +github.com/hyperledger/fabric v2.1.1+incompatible/go.mod h1:tGFAOCT696D3rG0Vofd2dyWYLySHlh0aQjf7Q1HAju0= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= +github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/apimachinery v0.31.2 h1:i4vUt2hPK56W6mlT7Ry+AO8eEsyxMD1U44NR22CLTYw= +k8s.io/apimachinery v0.31.2/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= diff --git a/handler/handler.go b/handler/handler.go new file mode 100644 index 0000000..0fec39e --- /dev/null +++ b/handler/handler.go @@ -0,0 +1,233 @@ +package handler + +import ( + "context" + "errors" + "fmt" + "html" + "log" + "net" + "net/http" + "net/url" + "os" + "path/filepath" + "syscall" + "time" + + "github.com/lsst-dm/s3nd/conf" + + "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/hyperledger/fabric/common/semaphore" + "golang.org/x/sys/unix" +) + +type S3ndHandler struct { + conf *conf.S3ndConf + awsConfig *aws.Config + s3Client *s3.Client + uploader *manager.Uploader + parallelUploads *semaphore.Semaphore +} + +type s3ndUploadTask struct { + uri *url.URL + bucket *string + key *string + file *string +} + +func NewHandler(conf *conf.S3ndConf) *S3ndHandler { + handler := &S3ndHandler{ + conf: conf, + } + + maxConns := int(*conf.UploadMaxParallel * 5) // allow for multipart upload creation + + var httpClient *awshttp.BuildableClient + + if conf.UploadBwlimit.Value() != 0 { + dialer := &net.Dialer{ + Control: func(network, address string, conn syscall.RawConn) error { + // https://pkg.go.dev/syscall#RawConn + var operr error + if err := conn.Control(func(fd uintptr) { + operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_MAX_PACING_RATE, int(conf.UploadBwlimit.Value()/8)) + }); err != nil { + return err + } + return operr + }, + } + + httpClient = awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) { + t.ExpectContinueTimeout = 0 + t.IdleConnTimeout = 0 + t.MaxIdleConns = maxConns + t.MaxConnsPerHost = maxConns + t.MaxIdleConnsPerHost = maxConns + t.WriteBufferSize = int(conf.UploadWriteBufferSize.Value()) + // disable http/2 to prevent muxing over a single tcp connection + t.ForceAttemptHTTP2 = false + t.TLSClientConfig.NextProtos = []string{"http/1.1"} + t.DialContext = dialer.DialContext + }) + } else { + httpClient = awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) { + t.ExpectContinueTimeout = 0 + t.IdleConnTimeout = 0 + t.MaxIdleConns = maxConns + t.MaxConnsPerHost = maxConns + t.MaxIdleConnsPerHost = maxConns + t.WriteBufferSize = int(conf.UploadWriteBufferSize.Value()) + // disable http/2 to prevent muxing over a single tcp connection + t.ForceAttemptHTTP2 = false + t.TLSClientConfig.NextProtos = []string{"http/1.1"} + }) + } + + awsCfg, err := config.LoadDefaultConfig( + context.TODO(), + config.WithBaseEndpoint(*conf.EndpointUrl), + config.WithHTTPClient(httpClient), + ) + if err != nil { + log.Fatal(err) + } + + handler.awsConfig = &awsCfg + + handler.s3Client = s3.NewFromConfig(awsCfg, func(o *s3.Options) { + o.UsePathStyle = true + o.Retryer = aws.NopRetryer{} // we handle retries ourselves + }) + + handler.uploader = manager.NewUploader(handler.s3Client, func(u *manager.Uploader) { + u.Concurrency = 1000 + u.MaxUploadParts = 1000 + u.PartSize = conf.UploadPartsize.Value() + }) + + sema := semaphore.New(int(*conf.UploadMaxParallel)) + handler.parallelUploads = &sema + + return handler +} + +func (h *S3ndHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + start := time.Now() + + task, err := h.parseRequest(r) + if err != nil { + w.Header().Set("x-error", err.Error()) + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "error parsing request: %s\n", err) + return + } + + log.Printf("queuing %v:%v | source %v\n", *task.bucket, *task.key, *task.file) + + // limit the number of parallel uploads + semaCtx, cancel := context.WithTimeout(r.Context(), *h.conf.QueueTimeout) + defer cancel() + if err := h.parallelUploads.Acquire(semaCtx); err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintf(w, "error acquiring semaphore: %s\n", err) + log.Printf("queue %v:%v | failed after %s: %s\n", *task.bucket, *task.key, time.Now().Sub(start), err) + return + } + defer h.parallelUploads.Release() + + if err := h.uploadFileMultipart(r.Context(), task); err != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "error uploading file: %s\n", err) + return + } + + fmt.Fprintf(w, "Successful put %q\n", html.EscapeString(task.uri.String())) +} + +func (h *S3ndHandler) parseRequest(r *http.Request) (*s3ndUploadTask, error) { + file := r.PostFormValue("file") + if file == "" { + return nil, fmt.Errorf("missing field: file") + } + uriRaw := r.PostFormValue("uri") + if uriRaw == "" { + return nil, fmt.Errorf("missing field: uri") + } + + if !filepath.IsAbs(file) { + return nil, fmt.Errorf("Only absolute file paths are supported: %q", html.EscapeString(file)) + } + + uri, err := url.Parse(uriRaw) + if err != nil { + return nil, fmt.Errorf("Unable to parse URI: %q", html.EscapeString(uriRaw)) + } + + if uri.Scheme != "s3" { + return nil, fmt.Errorf("Only s3 scheme is supported: %q", html.EscapeString(uriRaw)) + } + + bucket := uri.Host + if bucket == "" { + return nil, fmt.Errorf("Unable to parse bucket from URI: %q", html.EscapeString(uriRaw)) + } + key := uri.Path[1:] // Remove leading slash + + return &s3ndUploadTask{uri: uri, bucket: &bucket, key: &key, file: &file}, nil +} + +func (h *S3ndHandler) uploadFileMultipart(ctx context.Context, task *s3ndUploadTask) error { + start := time.Now() + file, err := os.Open(*task.file) + if err != nil { + log.Printf("upload %v:%v | Couldn't open file %v to upload because: %v\n", *task.bucket, *task.key, *task.file, err) + return err + } + defer file.Close() + + maxAttempts := *h.conf.UploadTries + var attempt int + for attempt = 1; attempt <= maxAttempts; attempt++ { + uploadCtx, cancel := context.WithTimeout(ctx, *h.conf.UploadTimeout) + defer cancel() + _, err = h.uploader.Upload(uploadCtx, &s3.PutObjectInput{ + Bucket: aws.String(*task.bucket), + Key: aws.String(*task.key), + Body: file, + }) + if err != nil { + log.Printf("upload %v:%v | failed after %s -- try %v/%v\n", *task.bucket, *task.key, time.Now().Sub(start), attempt, maxAttempts) + var noBucket *types.NoSuchBucket + if errors.As(err, &noBucket) { + log.Printf("upload %v:%v | Bucket does not exist.\n", *task.bucket, *task.key) + // Don't retry if the bucket doesn't exist. + return noBucket + } + + if errors.Is(err, context.Canceled) { + log.Printf("upload %v:%v | context cancelled\n", *task.bucket, *task.key) + // Don't retry if the client disconnected + return err + } + + log.Printf("upload %v:%v | failed because: %v\n", *task.bucket, *task.key, err) + + // bubble up the error if we've exhausted our attempts + if attempt == maxAttempts { + return err + } + } else { + break + } + } + + log.Printf("upload %v:%v | success in %s after %v/%v tries\n", *task.bucket, *task.key, time.Now().Sub(start), attempt, maxAttempts) + return nil +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..b427cdf --- /dev/null +++ b/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "errors" + "fmt" + "log" + "net/http" + + "github.com/lsst-dm/s3nd/conf" + "github.com/lsst-dm/s3nd/handler" +) + +func main() { + conf := conf.NewConf() + + handler := handler.NewHandler(&conf) + http.Handle("/", handler) + + addr := fmt.Sprintf("%s:%d", *conf.Host, *conf.Port) + log.Println("Listening on", addr) + + err := http.ListenAndServe(addr, nil) + if errors.Is(err, http.ErrServerClosed) { + log.Printf("server closed\n") + } else if err != nil { + log.Fatalf("error starting server: %s\n", err) + } +}