Skip to content

Commit

Permalink
Merge pull request #206 from kaleido-io/inflight-reset
Browse files Browse the repository at this point in the history
Inflight reset
  • Loading branch information
peterbroadhurst authored Mar 18, 2022
2 parents 6f5a277 + 6857002 commit 2e95ecd
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 99 deletions.
143 changes: 93 additions & 50 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,72 +239,112 @@ func (a *eventStream) postUpdateStream() {
a.batchCond.L.Lock()
a.startEventHandlers(false)
a.updateInProgress = false
a.inFlight = 0
a.batchCond.L.Unlock()
}

// update modifies an existing eventStream
func (a *eventStream) update(newSpec *StreamInfo) (spec *StreamInfo, err error) {
log.Infof("%s: Update event stream", a.spec.ID)
// set a flag to indicate updateInProgress
// For any go routines that are Wait() ing on the eventListener, wake them up
if err := a.preUpdateStream(); err != nil {
return nil, err
func (a *eventStream) checkUpdate(newSpec *StreamInfo) (updatedSpec *StreamInfo, err error) {

// setUpdated marks that there is a change, and creates a copied object
specCopy := *a.spec
setUpdated := func() *StreamInfo {
if updatedSpec == nil {
updatedSpec = &specCopy
}
return updatedSpec
}
// wait for the poked goroutines to finish up
<-a.eventPollerDone
<-a.batchProcessorDone
<-a.batchDispatcherDone
defer a.postUpdateStream()

if newSpec.Type != "" && newSpec.Type != a.spec.Type {
if newSpec.Type != "" && newSpec.Type != specCopy.Type {
return nil, errors.Errorf(errors.EventStreamsCannotUpdateType)
}
if a.spec.Type == "webhook" && newSpec.Webhook != nil {
if newSpec.Webhook.URL == "" {
return nil, errors.Errorf(errors.EventStreamsWebhookNoURL)
if specCopy.Type == "webhook" && newSpec.Webhook != nil {
if newSpec.Webhook.RequestTimeoutSec != 0 && newSpec.Webhook.RequestTimeoutSec != specCopy.Webhook.RequestTimeoutSec {
setUpdated().Webhook.RequestTimeoutSec = newSpec.Webhook.RequestTimeoutSec
}
if newSpec.Webhook.TLSkipHostVerify != specCopy.Webhook.TLSkipHostVerify {
setUpdated().Webhook.TLSkipHostVerify = newSpec.Webhook.TLSkipHostVerify
}
if _, err = url.Parse(newSpec.Webhook.URL); err != nil {
return nil, errors.Errorf(errors.EventStreamsWebhookInvalidURL)
if newSpec.Webhook.URL != "" && newSpec.Webhook.URL != specCopy.Webhook.URL {
if _, err = url.Parse(newSpec.Webhook.URL); err != nil {
return nil, errors.Errorf(errors.EventStreamsWebhookInvalidURL)
}
setUpdated().Webhook.URL = newSpec.Webhook.URL
}
if newSpec.Webhook.RequestTimeoutSec == 0 {
newSpec.Webhook.RequestTimeoutSec = 120
for k, v := range newSpec.Webhook.Headers {
if specCopy.Webhook.Headers == nil || specCopy.Webhook.Headers[k] != v {
setUpdated().Webhook.Headers = newSpec.Webhook.Headers
break
}
}
a.spec.Webhook.URL = newSpec.Webhook.URL
a.spec.Webhook.RequestTimeoutSec = newSpec.Webhook.RequestTimeoutSec
a.spec.Webhook.TLSkipHostVerify = newSpec.Webhook.TLSkipHostVerify
a.spec.Webhook.Headers = newSpec.Webhook.Headers
}
if a.spec.Type == "websocket" && newSpec.WebSocket != nil {
a.spec.WebSocket.Topic = newSpec.WebSocket.Topic
if err := validateWebSocket(newSpec.WebSocket); err != nil {
return nil, err
}
if specCopy.Type == "websocket" && newSpec.WebSocket != nil {
if newSpec.WebSocket.Topic != specCopy.WebSocket.Topic {
setUpdated().WebSocket.Topic = newSpec.WebSocket.Topic
}
if newSpec.WebSocket.DistributionMode != specCopy.WebSocket.DistributionMode {
setUpdated().WebSocket.DistributionMode = newSpec.WebSocket.DistributionMode
}
// Validate if we changed it
if updatedSpec != nil {
if err := validateWebSocket(newSpec.WebSocket); err != nil {
return nil, err
}
}
a.spec.WebSocket.DistributionMode = newSpec.WebSocket.DistributionMode
}

if a.spec.BatchSize != newSpec.BatchSize && newSpec.BatchSize != 0 && newSpec.BatchSize < MaxBatchSize {
a.spec.BatchSize = newSpec.BatchSize
if specCopy.BatchSize != newSpec.BatchSize && newSpec.BatchSize != 0 && newSpec.BatchSize < MaxBatchSize {
setUpdated().BatchSize = newSpec.BatchSize
}
if a.spec.BatchTimeoutMS != newSpec.BatchTimeoutMS && newSpec.BatchTimeoutMS != 0 {
a.spec.BatchTimeoutMS = newSpec.BatchTimeoutMS
if specCopy.BatchTimeoutMS != newSpec.BatchTimeoutMS && newSpec.BatchTimeoutMS != 0 {
setUpdated().BatchTimeoutMS = newSpec.BatchTimeoutMS
}
if a.spec.BlockedRetryDelaySec != newSpec.BlockedRetryDelaySec && newSpec.BlockedRetryDelaySec != 0 {
a.spec.BlockedRetryDelaySec = newSpec.BlockedRetryDelaySec
if specCopy.BlockedRetryDelaySec != newSpec.BlockedRetryDelaySec && newSpec.BlockedRetryDelaySec != 0 {
setUpdated().BlockedRetryDelaySec = newSpec.BlockedRetryDelaySec
}
if strings.ToLower(newSpec.ErrorHandling) == ErrorHandlingBlock {
a.spec.ErrorHandling = ErrorHandlingBlock
} else {
a.spec.ErrorHandling = ErrorHandlingSkip
if newSpec.ErrorHandling != "" && newSpec.ErrorHandling != specCopy.ErrorHandling {
if strings.ToLower(newSpec.ErrorHandling) == ErrorHandlingBlock {
setUpdated().ErrorHandling = ErrorHandlingBlock
} else {
setUpdated().ErrorHandling = ErrorHandlingSkip
}
}
if newSpec.Name != "" && a.spec.Name != newSpec.Name {
a.spec.Name = newSpec.Name
if newSpec.Name != "" && specCopy.Name != newSpec.Name {
setUpdated().Name = newSpec.Name
}
if a.spec.Timestamps != newSpec.Timestamps {
a.spec.Timestamps = newSpec.Timestamps
if specCopy.Timestamps != newSpec.Timestamps {
setUpdated().Timestamps = newSpec.Timestamps
}
if specCopy.Inputs != newSpec.Inputs {
setUpdated().Inputs = newSpec.Inputs
}

// Return a non-nil object ONLY if there's a change
return updatedSpec, nil
}

// update modifies an existing eventStream
func (a *eventStream) update(newSpec *StreamInfo) (spec *StreamInfo, err error) {
log.Infof("%s: Update event stream", a.spec.ID)
updatedSpec, err := a.checkUpdate(newSpec)
if err != nil {
return nil, err
}
if a.spec.Inputs != newSpec.Inputs {
a.spec.Inputs = newSpec.Inputs
if updatedSpec == nil {
log.Infof("%s: No change", a.spec.ID)
return a.spec, nil
}

// set a flag to indicate updateInProgress
// For any go routines that are Wait() ing on the eventListener, wake them up
if err := a.preUpdateStream(); err != nil {
return nil, err
}
a.spec = updatedSpec
// wait for the poked goroutines to finish up
<-a.eventPollerDone
<-a.batchProcessorDone
<-a.batchDispatcherDone
defer a.postUpdateStream()
return a.spec, nil
}

Expand Down Expand Up @@ -381,12 +421,14 @@ func (a *eventStream) resume() error {
func (a *eventStream) isBlocked() bool {
a.batchCond.L.Lock()
inFlight := a.inFlight
v := inFlight >= a.spec.BatchSize
isBlocked := inFlight >= a.spec.BatchSize
a.batchCond.L.Unlock()
if v {
if isBlocked {
log.Warnf("%s: Is currently blocked. InFlight=%d BatchSize=%d", a.spec.ID, inFlight, a.spec.BatchSize)
} else if inFlight > 0 {
log.Debugf("%s: InFlight=%d BatchSize=%d", a.spec.ID, inFlight, a.spec.BatchSize)
}
return v
return isBlocked
}

func (a *eventStream) markAllSubscriptionsStale(ctx context.Context) {
Expand Down Expand Up @@ -423,7 +465,8 @@ func (a *eventStream) eventPoller() {
// Clear any checkpoint
delete(checkpoint, sub.info.ID)
}
if sub.filterStale && !sub.deleting {
stale := sub.filterStale
if stale && !sub.deleting {
blockHeight, exists := checkpoint[sub.info.ID]
if !exists || blockHeight.Cmp(big.NewInt(0)) <= 0 {
blockHeight, err = sub.setInitialBlockHeight(ctx)
Expand Down
78 changes: 31 additions & 47 deletions internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ func TestUpdateStreamSwapType(t *testing.T) {
assert.Regexp("The type of an event stream cannot be changed", err)
}

func TestUpdateStreamInProgress(t *testing.T) {
func TestUpdateStreamNoOpUpdate(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
defer cleanup(t, dir)
Expand All @@ -1363,6 +1363,32 @@ func TestUpdateStreamInProgress(t *testing.T) {

stream.updateInProgress = true
_, err := stream.update(&StreamInfo{})
assert.NoError(err)
}

func TestUpdateStreamInProgress(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
defer cleanup(t, dir)

db, _ := kvstore.NewLDBKeyValueStore(dir)
_, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
ErrorHandling: ErrorHandlingSkip,
BatchSize: 5,
Webhook: &webhookActionInfo{},
}, db, 200)
defer svr.Close()
defer close(eventStream)
defer stream.stop(false)

stream.updateInProgress = true
_, err := stream.update(&StreamInfo{
Webhook: &webhookActionInfo{
URL: "http://newurl.example.com",
},
ErrorHandling: ErrorHandlingBlock,
})
assert.Regexp("Update to event stream already in progress", err)
}

Expand Down Expand Up @@ -1419,6 +1445,7 @@ func TestUpdateWebSocket(t *testing.T) {

ctx := context.Background()
updateSpec := &StreamInfo{
ErrorHandling: ErrorHandlingSkip,
WebSocket: &webSocketActionInfo{
Topic: "test2",
},
Expand All @@ -1429,51 +1456,6 @@ func TestUpdateWebSocket(t *testing.T) {
assert.NoError(err)
}

func TestUpdateStreamMissingWebhookURL(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
defer cleanup(t, dir)

db, _ := kvstore.NewLDBKeyValueStore(dir)
sm, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
ErrorHandling: ErrorHandlingBlock,
Webhook: &webhookActionInfo{},
}, db, 200)
defer svr.Close()

s := setupTestSubscription(assert, sm, stream, "mySubName")
assert.Equal("mySubName", s.Name)

// We expect three events to be sent to the webhook
// With the default batch size of 1, that means three separate requests
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
<-eventStream
<-eventStream
<-eventStream
wg.Done()
}()
wg.Wait()

ctx := context.Background()
updateSpec := &StreamInfo{
Webhook: &webhookActionInfo{
URL: "",
TLSkipHostVerify: true,
RequestTimeoutSec: 5,
},
}
_, err := sm.UpdateStream(ctx, stream.spec.ID, updateSpec)
assert.Regexp(errors.EventStreamsWebhookNoURL.Code(), err)
err = sm.DeleteSubscription(ctx, s.ID)
assert.NoError(err)
err = sm.DeleteStream(ctx, stream.spec.ID)
assert.NoError(err)
sm.Close(true)
}

func TestUpdateStreamInvalidWebhookURL(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
Expand All @@ -1483,7 +1465,9 @@ func TestUpdateStreamInvalidWebhookURL(t *testing.T) {
sm, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
ErrorHandling: ErrorHandlingBlock,
Webhook: &webhookActionInfo{},
Webhook: &webhookActionInfo{
RequestTimeoutSec: 240,
},
}, db, 200)
defer svr.Close()

Expand Down
2 changes: 1 addition & 1 deletion internal/events/submanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ func TestStreamAndSubscriptionErrors(t *testing.T) {

blockCall := make(chan struct{})
rpc := &ethmocks.RPCClient{}
rpc.On("CallContext", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { <-blockCall }).Return(nil)
rpc.On("CallContext", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).Maybe()
rpc.On("CallContext", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { <-blockCall }).Return(nil)
sm.rpc = rpc

sm.db, _ = kvstore.NewLDBKeyValueStore(path.Join(dir, "db"))
Expand Down
2 changes: 1 addition & 1 deletion internal/events/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ func (w *webSocketAction) attemptBatch(batchNumber, attempt uint64, events []*ev
}

// Pass back any exception from the client
log.Infof("Attempt batch %d complete. ok=%t", batchNumber, err == nil)
log.Infof("WebSocket event batch %d complete (len=%d). err=%v", batchNumber, len(events), err)
return err
}

0 comments on commit 2e95ecd

Please sign in to comment.