From 29e890ddc7cd78d7076b7f22fd994072f70c7890 Mon Sep 17 00:00:00 2001 From: Eran Duchan Date: Tue, 18 Feb 2020 15:54:28 +0200 Subject: [PATCH] Remove lock in consumer group, clean up context API (#52) --- pkg/dataplane/http/context.go | 15 +++++++-------- pkg/dataplane/http/types.go | 9 +++++++++ .../streamconsumergroup/sequencenumberhandler.go | 9 --------- pkg/dataplane/test/streamconsumergroup_test.go | 4 ++-- pkg/dataplane/test/sync_test.go | 10 +++++----- pkg/dataplane/test/test.go | 16 ++++++---------- pkg/dataplane/types.go | 8 -------- 7 files changed, 29 insertions(+), 42 deletions(-) create mode 100644 pkg/dataplane/http/types.go diff --git a/pkg/dataplane/http/context.go b/pkg/dataplane/http/context.go index 92c651b..2f5782f 100755 --- a/pkg/dataplane/http/context.go +++ b/pkg/dataplane/http/context.go @@ -67,11 +67,7 @@ func NewClient(newClientInput *NewClientInput) *fasthttp.Client { } } -func NewDefaultClient() *fasthttp.Client { - return NewClient(&NewClientInput{}) -} - -func NewContext(parentLogger logger.Logger, client *fasthttp.Client, newContextInput *v3io.NewContextInput) (v3io.Context, error) { +func NewContext(parentLogger logger.Logger, newContextInput *NewContextInput) (v3io.Context, error) { requestChanLen := newContextInput.RequestChanLen if requestChanLen == 0 { requestChanLen = 1024 @@ -82,9 +78,14 @@ func NewContext(parentLogger logger.Logger, client *fasthttp.Client, newContextI numWorkers = 8 } + httpClient := newContextInput.HTTPClient + if httpClient == nil { + httpClient = NewClient(&NewClientInput{}) + } + newContext := &context{ logger: parentLogger.GetChild("context.http"), - httpClient: client, + httpClient: httpClient, requestChan: make(chan *v3io.Request, requestChanLen), numWorkers: numWorkers, } @@ -687,8 +688,6 @@ func (c *context) PutRecordsSync(putRecordsInput *v3io.PutRecordsInput) (*v3io.R } buffer.WriteString(`]}`) - str := buffer.String() - fmt.Println(str) response, err := c.sendRequest(&putRecordsInput.DataPlaneInput, http.MethodPost, diff --git a/pkg/dataplane/http/types.go b/pkg/dataplane/http/types.go new file mode 100644 index 0000000..09830ff --- /dev/null +++ b/pkg/dataplane/http/types.go @@ -0,0 +1,9 @@ +package v3iohttp + +import "github.com/valyala/fasthttp" + +type NewContextInput struct { + HTTPClient *fasthttp.Client + NumWorkers int + RequestChanLen int +} diff --git a/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go b/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go index 4ddafe3..8c9877b 100644 --- a/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go +++ b/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go @@ -1,7 +1,6 @@ package streamconsumergroup import ( - "sync" "time" "github.com/v3io/v3io-go/pkg/common" @@ -17,7 +16,6 @@ type sequenceNumberHandler struct { logger logger.Logger member *member markedShardSequenceNumbers []uint64 - markedShardSequenceNumbersLock sync.RWMutex stopMarkedShardSequenceNumberCommitterChan chan struct{} lastCommittedShardSequenceNumbers []uint64 } @@ -54,12 +52,7 @@ func (snh *sequenceNumberHandler) stop() error { } func (snh *sequenceNumberHandler) markShardSequenceNumber(shardID int, sequenceNumber uint64) error { - - // lock semantics are reverse - it's OK to write in parallel since each write goes - // to a different cell in the array, but once a read is happening we need to stop the world - snh.markedShardSequenceNumbersLock.RLock() snh.markedShardSequenceNumbers[shardID] = sequenceNumber - snh.markedShardSequenceNumbersLock.RUnlock() return nil } @@ -88,9 +81,7 @@ func (snh *sequenceNumberHandler) commitMarkedShardSequenceNumbers() error { var markedShardSequenceNumbersCopy []uint64 // create a copy of the marked shard sequenceNumbers - snh.markedShardSequenceNumbersLock.Lock() markedShardSequenceNumbersCopy = append(markedShardSequenceNumbersCopy, snh.markedShardSequenceNumbers...) - snh.markedShardSequenceNumbersLock.Unlock() // if there was no chance since last, do nothing if common.Uint64SlicesEqual(snh.lastCommittedShardSequenceNumbers, markedShardSequenceNumbersCopy) { diff --git a/pkg/dataplane/test/streamconsumergroup_test.go b/pkg/dataplane/test/streamconsumergroup_test.go index c5073da..fbf8f4a 100644 --- a/pkg/dataplane/test/streamconsumergroup_test.go +++ b/pkg/dataplane/test/streamconsumergroup_test.go @@ -20,12 +20,12 @@ type recordData struct { } type streamConsumerGroupTestSuite struct { - StreamTestSuite + streamTestSuite streamPath string } func (suite *streamConsumerGroupTestSuite) SetupSuite() { - suite.StreamTestSuite.SetupSuite() + suite.streamTestSuite.SetupSuite() suite.createContainer() suite.streamPath = fmt.Sprintf("%s/test-stream-0/", suite.testPath) } diff --git a/pkg/dataplane/test/sync_test.go b/pkg/dataplane/test/sync_test.go index 5489155..e6cd41f 100644 --- a/pkg/dataplane/test/sync_test.go +++ b/pkg/dataplane/test/sync_test.go @@ -632,22 +632,22 @@ func (suite *syncContainerKVTestSuite) SetupSuite() { type syncStreamTestSuite struct { syncTestSuite - StreamTestSuite StreamTestSuite + streamTestSuite streamTestSuite } func (suite *syncStreamTestSuite) SetupTest() { - suite.StreamTestSuite = StreamTestSuite{ + suite.streamTestSuite = streamTestSuite{ testSuite: suite.syncTestSuite.testSuite, } - suite.StreamTestSuite.SetupTest() + suite.streamTestSuite.SetupTest() } func (suite *syncStreamTestSuite) TearDownTest() { - suite.StreamTestSuite.TearDownTest() + suite.streamTestSuite.TearDownTest() } func (suite *syncStreamTestSuite) TestStream() { - streamPath := fmt.Sprintf("%s/mystream/", suite.StreamTestSuite.testPath) + streamPath := fmt.Sprintf("%s/mystream/", suite.streamTestSuite.testPath) // // Create the stream diff --git a/pkg/dataplane/test/test.go b/pkg/dataplane/test/test.go index 08a1d09..2d2a408 100644 --- a/pkg/dataplane/test/test.go +++ b/pkg/dataplane/test/test.go @@ -37,7 +37,7 @@ func (suite *testSuite) createContext() { var err error // create a context - suite.container, err = v3iohttp.NewContext(suite.logger, v3iohttp.NewDefaultClient(), &v3io.NewContextInput{}) + suite.container, err = v3iohttp.NewContext(suite.logger, &v3iohttp.NewContextInput{}) suite.Require().NoError(err) // populate fields that would have been populated by session/container @@ -57,7 +57,7 @@ func (suite *testSuite) createContext() { func (suite *testSuite) createContainer() { // create a context - context, err := v3iohttp.NewContext(suite.logger, v3iohttp.NewDefaultClient(), &v3io.NewContextInput{}) + context, err := v3iohttp.NewContext(suite.logger, &v3iohttp.NewContextInput{}) suite.Require().NoError(err) session, err := context.NewSession(&v3io.NewSessionInput{ @@ -74,17 +74,13 @@ func (suite *testSuite) createContainer() { suite.Require().NoError(err) } -type StreamTestSuite struct { // nolint: deadcode +type streamTestSuite struct { // nolint: deadcode testSuite testPath string } -func (suite *StreamTestSuite) SetupSuite() { - suite.testSuite.SetupSuite() +func (suite *streamTestSuite) SetupTest() { suite.testPath = "/stream-test" -} - -func (suite *StreamTestSuite) SetupTest() { err := suite.deleteAllStreamsInPath(suite.testPath) // get the underlying root error @@ -97,12 +93,12 @@ func (suite *StreamTestSuite) SetupTest() { } } -func (suite *StreamTestSuite) TearDownTest() { +func (suite *streamTestSuite) TearDownTest() { err := suite.deleteAllStreamsInPath(suite.testPath) suite.Require().NoError(err, "Failed to tear down test suite") } -func (suite *StreamTestSuite) deleteAllStreamsInPath(path string) error { +func (suite *streamTestSuite) deleteAllStreamsInPath(path string) error { getContainerContentsInput := v3io.GetContainerContentsInput{ Path: path, } diff --git a/pkg/dataplane/types.go b/pkg/dataplane/types.go index 053980d..aa4833e 100644 --- a/pkg/dataplane/types.go +++ b/pkg/dataplane/types.go @@ -23,20 +23,12 @@ import ( "strconv" "strings" "time" - - "github.com/valyala/fasthttp" ) // // Control plane // -type NewContextInput struct { - Client *fasthttp.Client - NumWorkers int - RequestChanLen int -} - type NewSessionInput struct { URL string Username string