Skip to content

Commit

Permalink
Add changing visibility timeout for each call
Browse files Browse the repository at this point in the history
  • Loading branch information
danielle-tfh committed Dec 2, 2024
1 parent 3e23538 commit 4f661f0
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 25 deletions.
17 changes: 1 addition & 16 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Config struct {
WorkersNum int
VisibilityTimeout int32
BatchSize int32
ExtendEnabled bool
}

type Consumer struct {
Expand Down Expand Up @@ -56,6 +55,7 @@ loop:
MaxNumberOfMessages: c.cfg.BatchSize,
WaitTimeSeconds: int32(5),
MessageAttributeNames: []string{"TraceID", "SpanID"},
VisibilityTimeout: c.cfg.VisibilityTimeout,
})
if err != nil {
zap.S().With(zap.Error(err)).Error("could not receive messages from SQS")
Expand Down Expand Up @@ -83,9 +83,6 @@ func (c *Consumer) worker(ctx context.Context, messages <-chan *Message) {

func (c *Consumer) handleMsg(ctx context.Context, m *Message) error {
if c.handler != nil {
if c.cfg.ExtendEnabled {
c.extend(ctx, m)
}
if err := c.handler.Run(ctx, m); err != nil {
return m.ErrorResponse(err)
}
Expand All @@ -104,15 +101,3 @@ func (c *Consumer) delete(ctx context.Context, m *Message) error {
zap.S().Debug("message deleted")
return nil
}

func (c *Consumer) extend(ctx context.Context, m *Message) {
_, err := c.sqs.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{
QueueUrl: &c.cfg.QueueURL,
ReceiptHandle: m.ReceiptHandle,
VisibilityTimeout: c.cfg.VisibilityTimeout,
})
if err != nil {
zap.S().With(zap.Error(err)).Error("unable to extend message")
return
}
}
2 changes: 0 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func TestConsume(t *testing.T) {
WorkersNum: workersNum,
VisibilityTimeout: visibilityTimeout,
BatchSize: batchSize,
ExtendEnabled: true,
}
consumer := NewConsumer(awsCfg, config, msgHandler)
go consumer.Consume(ctx)
Expand Down Expand Up @@ -109,7 +108,6 @@ func TestConsume_GracefulShutdown(t *testing.T) {
WorkersNum: workersNum,
VisibilityTimeout: visibilityTimeout,
BatchSize: batchSize,
ExtendEnabled: true,
}
msgHandler := MsgHandler{}
consumer := NewConsumer(awsCfg, config, &msgHandler)
Expand Down
3 changes: 0 additions & 3 deletions consumer_with_idle_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ func (c *ConsumerWithIdleTrigger) handleMsg(ctx context.Context, m *Message) err
return m.ErrorResponse(err)
}
} else {
if c.cfg.ExtendEnabled {
c.extend(ctx, m)
}
if err := c.handler.Run(ctx, m); err != nil {
return m.ErrorResponse(err)
}
Expand Down
4 changes: 0 additions & 4 deletions consumer_with_idle_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func TestConsumeWithIdleTrigger(t *testing.T) {
WorkersNum: workersNum,
VisibilityTimeout: visibilityTimeout,
BatchSize: batchSize,
ExtendEnabled: true,
}
consumer := NewConsumerWithIdleTrigger(awsCfg, config, msgHandler, IdleTimeout, SqsReceiveWaitTimeSeconds)
go consumer.Consume(ctx)
Expand Down Expand Up @@ -99,7 +98,6 @@ func TestConsumeWithIdleTimeout_GracefulShutdown(t *testing.T) {
WorkersNum: workersNum,
VisibilityTimeout: visibilityTimeout,
BatchSize: batchSize,
ExtendEnabled: true,
}
msgHandler := MsgHandlerWithIdleTrigger{
t: t,
Expand Down Expand Up @@ -150,7 +148,6 @@ func TestConsumeWithIdleTimeout_TimesOut(t *testing.T) {
WorkersNum: workersNum,
VisibilityTimeout: visibilityTimeout,
BatchSize: batchSize,
ExtendEnabled: true,
}
msgHandler := MsgHandlerWithIdleTrigger{
t: t,
Expand Down Expand Up @@ -193,7 +190,6 @@ func TestConsumeWithIdleTimeout_TimesOutAndConsumes(t *testing.T) {
WorkersNum: workersNum,
VisibilityTimeout: visibilityTimeout,
BatchSize: batchSize,
ExtendEnabled: true,
}
msgHandler := handlerWithIdleTrigger(t, expectedMsg, expectedMsgAttributes)
consumer := NewConsumerWithIdleTrigger(awsCfg, config, msgHandler, IdleTimeout, SqsReceiveWaitTimeSeconds)
Expand Down

0 comments on commit 4f661f0

Please sign in to comment.