From 6634006b2a6f96f5014dc36609bcb277977900b3 Mon Sep 17 00:00:00 2001 From: mike Date: Wed, 25 Oct 2023 14:30:21 -0500 Subject: [PATCH] add a method to consume the pending list of a stream one by one, letting the consumer know when there is no pending elements for him --- pkg/consumer/consumer.go | 37 +++++++++++++++++++++++++++++++++++++ redsumer.go | 18 ++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index cf0a81e..862db55 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -70,3 +70,40 @@ func Consume(ctx context.Context, client *redis.Client, groupName string, consum } return claimMessages, nil } + +func ConsumePendingOneByOne(ctx context.Context, client *redis.Client, groupName string, consumerName string, streamName string, id string) (*redis.XMessage, error) { + + _, err := client.XReadGroup(context.Background(), &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 500, + NoAck: false, + Block: 1, + }).Result() + + if err != nil && !os.IsTimeout(err) { + err := fmt.Errorf("error xreadgroup: %v", err) + return nil, err + } + + entries, err := client.XReadGroup(context.Background(), &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, id}, + Count: 1, + NoAck: false, + Block: 1, + }).Result() + + if err != nil && !os.IsTimeout(err) { + err := fmt.Errorf("error xreadgroup: %v", err) + return nil, err + } + + if len(entries) > 0 { + return &entries[0].Messages[0], nil + } + + return nil, nil +} diff --git a/redsumer.go b/redsumer.go index 64c888d..56d7d17 100644 --- a/redsumer.go +++ b/redsumer.go @@ -28,6 +28,7 @@ type RedConsumerArgs struct { RedisHost string RedisPort int Db int + StreamIndex *string } type RedProducerArgs struct { @@ -56,6 +57,23 @@ func (c RedConsumer) Consume(ctx context.Context) ([]redis.XMessage, error) { return messages, err } +func (c RedConsumer) ConsumePendingOneByOne(ctx context.Context) (*redis.XMessage, error) { + if c.args.StreamIndex == nil { + strStreamIndex := "0-0" + c.args.StreamIndex = &strStreamIndex + } + + message, err := consumer.ConsumePendingOneByOne(ctx, c.client, c.args.Group, c.args.ConsumerName, c.args.Stream, *c.args.StreamIndex) + + if message != nil { + c.args.StreamIndex = &message.ID + } else { + c.args.StreamIndex = nil + } + + return message, err +} + // given a list of tries([]int) this method will wait until the stream is ready // if the stream is not ready after the tries it will return an erro // the numbers in the tries array represent the number of seconds that the loop will wait until it tries again