From 09fa125adbe59d02e1e67470bfeebcb6b9c1596e Mon Sep 17 00:00:00 2001 From: Tulsi Shah <46474643+Tulsishah@users.noreply.github.com> Date: Wed, 20 Nov 2024 19:56:26 +0530 Subject: [PATCH] feat(gensupport): per-chunk transfer timeout configs (#2865) feat(gensupport): per-chunk transfer timeout configs Allow users to configure the per-chunk transfer timeout for retries that's used during resumable uploads. Needs to be exposed via the manual layer for storage. Tested the feature(with default timeout and with some random value) with storagetestbench server emulator. --- googleapi/googleapi.go | 15 ++++++++++++ internal/gensupport/media.go | 19 ++++++++------- internal/gensupport/media_test.go | 33 ++++++++++++++++++++++----- internal/gensupport/resumable.go | 28 ++++++++++++++++++++++- internal/gensupport/resumable_test.go | 3 ++- 5 files changed, 82 insertions(+), 16 deletions(-) diff --git a/googleapi/googleapi.go b/googleapi/googleapi.go index b5e38c66282..04a10f51c97 100644 --- a/googleapi/googleapi.go +++ b/googleapi/googleapi.go @@ -259,6 +259,20 @@ func ChunkSize(size int) MediaOption { return chunkSizeOption(size) } +type chunkTransferTimeoutOption time.Duration + +func (cd chunkTransferTimeoutOption) setOptions(o *MediaOptions) { + o.ChunkTransferTimeout = time.Duration(cd) +} + +// ChunkTransferTimeout returns a MediaOption which sets a per-chunk +// transfer timeout for resumable uploads. If a single chunk has been +// attempting to upload for longer than this time then the old req got canceled and retried. +// The default is no timeout for the request. +func ChunkTransferTimeout(timeout time.Duration) MediaOption { + return chunkTransferTimeoutOption(timeout) +} + type chunkRetryDeadlineOption time.Duration func (cd chunkRetryDeadlineOption) setOptions(o *MediaOptions) { @@ -283,6 +297,7 @@ type MediaOptions struct { ForceEmptyContentType bool ChunkSize int ChunkRetryDeadline time.Duration + ChunkTransferTimeout time.Duration } // ProcessMediaOptions stores options from opts in a MediaOptions. diff --git a/internal/gensupport/media.go b/internal/gensupport/media.go index c048a57084b..0861d4d3c87 100644 --- a/internal/gensupport/media.go +++ b/internal/gensupport/media.go @@ -135,13 +135,14 @@ func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer // code only. type MediaInfo struct { // At most one of Media and MediaBuffer will be set. - media io.Reader - buffer *MediaBuffer - singleChunk bool - mType string - size int64 // mediaSize, if known. Used only for calls to progressUpdater_. - progressUpdater googleapi.ProgressUpdater - chunkRetryDeadline time.Duration + media io.Reader + buffer *MediaBuffer + singleChunk bool + mType string + size int64 // mediaSize, if known. Used only for calls to progressUpdater_. + progressUpdater googleapi.ProgressUpdater + chunkRetryDeadline time.Duration + chunkTransferTimeout time.Duration } // NewInfoFromMedia should be invoked from the Media method of a call. It returns a @@ -157,6 +158,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo { } } mi.chunkRetryDeadline = opts.ChunkRetryDeadline + mi.chunkTransferTimeout = opts.ChunkTransferTimeout mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize) return mi } @@ -294,7 +296,8 @@ func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload { mi.progressUpdater(curr, mi.size) } }, - ChunkRetryDeadline: mi.chunkRetryDeadline, + ChunkRetryDeadline: mi.chunkRetryDeadline, + ChunkTransferTimeout: mi.chunkTransferTimeout, } } diff --git a/internal/gensupport/media_test.go b/internal/gensupport/media_test.go index 65fd62b3dcc..f018fabf8fe 100644 --- a/internal/gensupport/media_test.go +++ b/internal/gensupport/media_test.go @@ -218,12 +218,13 @@ func TestUploadRequestGetBody(t *testing.T) { func TestResumableUpload(t *testing.T) { for _, test := range []struct { - desc string - r io.Reader - chunkSize int - wantUploadType string - wantResumableUpload bool - chunkRetryDeadline time.Duration + desc string + r io.Reader + chunkSize int + wantUploadType string + wantResumableUpload bool + chunkRetryDeadline time.Duration + chunkTransferTimeOut time.Duration }{ { desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk", @@ -263,11 +264,22 @@ func TestResumableUpload(t *testing.T) { wantResumableUpload: true, chunkRetryDeadline: 1 * time.Second, }, + { + desc: "confirm that ChunkTransferTimeout is carried to ResumableUpload", + r: &nullReader{2 * googleapi.MinUploadChunkSize}, + chunkSize: 1, + wantUploadType: "resumable", + wantResumableUpload: true, + chunkTransferTimeOut: 5 * time.Second, + }, } { opts := []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)} if test.chunkRetryDeadline != 0 { opts = append(opts, googleapi.ChunkRetryDeadline(test.chunkRetryDeadline)) } + if test.chunkTransferTimeOut != 0 { + opts = append(opts, googleapi.ChunkTransferTimeout(test.chunkTransferTimeOut)) + } mi := NewInfoFromMedia(test.r, opts) if got, want := mi.UploadType(), test.wantUploadType; got != want { t.Errorf("%s: upload type: got %q, want %q", test.desc, got, want) @@ -284,6 +296,15 @@ func TestResumableUpload(t *testing.T) { t.Errorf("%s: test case invalid; resumable upload is nil", test.desc) } } + if test.chunkTransferTimeOut != 0 { + if got := mi.ResumableUpload(""); got != nil { + if got.ChunkTransferTimeout != test.chunkTransferTimeOut { + t.Errorf("%s: ChunkTransferTimeout: got %v, want %v", test.desc, got.ChunkTransferTimeout, test.chunkTransferTimeOut) + } + } else { + t.Errorf("%s: test case invalid; resumable upload is nil", test.desc) + } + } } } diff --git a/internal/gensupport/resumable.go b/internal/gensupport/resumable.go index f828ddb60e6..9e3bcf15963 100644 --- a/internal/gensupport/resumable.go +++ b/internal/gensupport/resumable.go @@ -43,6 +43,10 @@ type ResumableUpload struct { // retries should happen. ChunkRetryDeadline time.Duration + // ChunkTransferTimeout configures the per-chunk transfer timeout. If a chunk upload stalls for longer than + // this duration, the upload will be retried. + ChunkTransferTimeout time.Duration + // Track current request invocation ID and attempt count for retry metrics // and idempotency headers. invocationID string @@ -241,13 +245,35 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err default: } - resp, err = rx.transferChunk(ctx) + // rCtx is derived from a context with a defined transferTimeout with non-zero value. + // If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded, + // triggering a retry of the request. + var rCtx context.Context + var cancel context.CancelFunc + + rCtx = ctx + if rx.ChunkTransferTimeout != 0 { + rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout) + } + + resp, err = rx.transferChunk(rCtx) var status int if resp != nil { status = resp.StatusCode } + // The upload should be retried if the rCtx is canceled due to a timeout. + select { + case <-rCtx.Done(): + if errors.Is(rCtx.Err(), context.DeadlineExceeded) { + // Cancel the context for rCtx + cancel() + continue + } + default: + } + // Check if we should retry the request. if !errorFunc(status, err) { quitAfterTimer.Stop() diff --git a/internal/gensupport/resumable_test.go b/internal/gensupport/resumable_test.go index 0000d403e0c..5b1e39a5945 100644 --- a/internal/gensupport/resumable_test.go +++ b/internal/gensupport/resumable_test.go @@ -6,6 +6,7 @@ package gensupport import ( "context" + "errors" "fmt" "io" "net/http" @@ -280,7 +281,7 @@ func TestCancelUploadBasic(t *testing.T) { defer func() { backoff = oldBackoff }() res, err := rx.Upload(ctx) - if err != context.Canceled { + if !errors.Is(err, context.Canceled) { t.Fatalf("Upload err: got: %v; want: context cancelled", err) } if res != nil {