Skip to content

Commit

Permalink
Add SQS wait timeout for idle trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
danielle-tfh committed Oct 14, 2024
1 parent a7ffeb3 commit dd77a72
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
26 changes: 14 additions & 12 deletions consumer_with_idle_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@ import (
)

type ConsumerWithIdleTrigger struct {
sqs *sqs.Client
handler HandlerWithIdleTrigger
wg *sync.WaitGroup
cfg Config
idleDurationTimeout time.Duration
sqs *sqs.Client
handler HandlerWithIdleTrigger
wg *sync.WaitGroup
cfg Config
idleDurationTimeout time.Duration
sqsReceiveWaitTimeSeconds int32
}

func NewConsumerWithIdleTrigger(awsCfg aws.Config, cfg Config, handler HandlerWithIdleTrigger, idleDurationTimeout time.Duration) *ConsumerWithIdleTrigger {
func NewConsumerWithIdleTrigger(awsCfg aws.Config, cfg Config, handler HandlerWithIdleTrigger, idleDurationTimeout time.Duration, sqsReceiveWaitTimeSeconds int32) *ConsumerWithIdleTrigger {
return &ConsumerWithIdleTrigger{
sqs: sqs.NewFromConfig(awsCfg),
handler: handler,
wg: &sync.WaitGroup{},
cfg: cfg,
idleDurationTimeout: idleDurationTimeout,
sqs: sqs.NewFromConfig(awsCfg),
handler: handler,
wg: &sync.WaitGroup{},
cfg: cfg,
idleDurationTimeout: idleDurationTimeout,
sqsReceiveWaitTimeSeconds: sqsReceiveWaitTimeSeconds,
}
}

Expand Down Expand Up @@ -57,7 +59,7 @@ loop:
output, err := c.sqs.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: &c.cfg.QueueURL,
MaxNumberOfMessages: c.cfg.BatchSize,
WaitTimeSeconds: int32(c.idleDurationTimeout.Seconds()),
WaitTimeSeconds: c.sqsReceiveWaitTimeSeconds,
MessageAttributeNames: []string{"TraceID", "SpanID"},
})
if err != nil {
Expand Down
17 changes: 9 additions & 8 deletions consumer_with_idle_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type MsgHandlerWithIdleTrigger struct {
}

const (
IdleTimeout = 500 * time.Millisecond
IdleTimeout = 500 * time.Millisecond
SqsReceiveWaitTimeSeconds = int32(1)
)

func TestConsumeWithIdleTrigger(t *testing.T) {
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestConsumeWithIdleTrigger(t *testing.T) {
BatchSize: batchSize,
ExtendEnabled: true,
}
consumer := NewConsumerWithIdleTrigger(awsCfg, config, msgHandler, IdleTimeout)
consumer := NewConsumerWithIdleTrigger(awsCfg, config, msgHandler, IdleTimeout, SqsReceiveWaitTimeSeconds)
go consumer.Consume(ctx)

t.Cleanup(func() {
Expand Down Expand Up @@ -104,7 +105,7 @@ func TestConsumeWithIdleTimeout_GracefulShutdown(t *testing.T) {
t: t,
msgsReceivedCount: 0,
}
consumer := NewConsumerWithIdleTrigger(awsCfg, config, &msgHandler, IdleTimeout)
consumer := NewConsumerWithIdleTrigger(awsCfg, config, &msgHandler, IdleTimeout, SqsReceiveWaitTimeSeconds)
var wg sync.WaitGroup
wg.Add(2)

Expand Down Expand Up @@ -155,15 +156,15 @@ func TestConsumeWithIdleTimeout_TimesOut(t *testing.T) {
t: t,
msgsReceivedCount: 0,
}
consumer := NewConsumerWithIdleTrigger(awsCfg, config, &msgHandler, IdleTimeout)
consumer := NewConsumerWithIdleTrigger(awsCfg, config, &msgHandler, IdleTimeout, SqsReceiveWaitTimeSeconds)
go consumer.Consume(ctx)

t.Cleanup(func() {
cancel()
})

// Wait for the timeout
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 3)

// ensure that it gets called multiple times
assert.GreaterOrEqual(t, msgHandler.idleTimeoutTriggeredCount, 2)
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestConsumeWithIdleTimeout_TimesOutAndConsumes(t *testing.T) {
ExtendEnabled: true,
}
msgHandler := handlerWithIdleTrigger(t, expectedMsg, expectedMsgAttributes)
consumer := NewConsumerWithIdleTrigger(awsCfg, config, msgHandler, IdleTimeout)
consumer := NewConsumerWithIdleTrigger(awsCfg, config, msgHandler, IdleTimeout, SqsReceiveWaitTimeSeconds)
go consumer.Consume(ctx)

t.Cleanup(func() {
Expand All @@ -206,7 +207,7 @@ func TestConsumeWithIdleTimeout_TimesOutAndConsumes(t *testing.T) {
}
cancel()
})
time.Sleep(time.Second * 1)
time.Sleep(time.Second * 2)

// ensure that it gets called first before receiving a message
assert.GreaterOrEqual(t, msgHandler.idleTimeoutTriggeredCount, 1)
Expand All @@ -218,7 +219,7 @@ func TestConsumeWithIdleTimeout_TimesOutAndConsumes(t *testing.T) {
time.Sleep(time.Second * 2)
// Check that the message arrived
assert.Equal(t, 1, msgHandler.msgsReceivedCount)
assert.GreaterOrEqual(t, msgHandler.idleTimeoutTriggeredCount, 3)
assert.GreaterOrEqual(t, msgHandler.idleTimeoutTriggeredCount, 2)

}

Expand Down

0 comments on commit dd77a72

Please sign in to comment.