Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait group for senders #6

Open
lzap opened this issue Jun 14, 2022 · 1 comment
Open

Wait group for senders #6

lzap opened this issue Jun 14, 2022 · 1 comment

Comments

@lzap
Copy link

lzap commented Jun 14, 2022

Hey,

I am digging in the codebase and testing it, I have found a bug. Sending messages in separate gouroutines is not synchronized, meaning when a service is asked to shutdown and the main function finishes, all other goroutines will die immediately.

To solve this, I've implemented a simple wait group in my copy of your code:

type client struct {
	sqs      *sqs.Client
	queueURL string
	logger   log.Logger
	senderWG sync.WaitGroup // THIS

	handlers     map[string]Handler
	heartbeatSec int
	maxBeats     int
	workerPool   int
	maxMessages  int
}

func (c *client) Enqueue(ctx context.Context, jobType string, body interface{}, extraAttributes ...string) error {
	bytes, err := json.Marshal(body)
	if err != nil {
		return err
	}

	deduplicationId := generateDeduplicationId()
	attributes := []string{"dedup_id", deduplicationId}
	if len(extraAttributes) > 0 {
		attributes = append(attributes, extraAttributes...)
	}
	sqsInput := &sqs.SendMessageInput{
		MessageBody:            aws.String(string(bytes)),
		MessageAttributes:      defaultSQSAttributes(jobType, attributes...),
		MessageGroupId:         aws.String(jobType),
		MessageDeduplicationId: aws.String(deduplicationId),
		QueueUrl:               aws.String(c.queueURL),
	}

	c.senderWG.Add(1) // HERE
	go c.sendDirectMessage(ctx, sqsInput)
	return nil
}

func (c *client) sendDirectMessage(ctx context.Context, input *sqs.SendMessageInput, retryCount ...int) {
	var count int
	if len(retryCount) != 0 {
		count = retryCount[0]
	}

	if count > maxRetryCount-1 {
		c.logger.Log(ctx, log.LogLevelError, "too many failures, giving up", nil)
		c.senderWG.Done()
		return
	}

	if _, err := c.sqs.SendMessage(ctx, input); err != nil {
		if err.Error() == errDataLimit.Error() {
			c.logger.Log(ctx, log.LogLevelError, "payload limit overflow, giving up", nil)
			c.senderWG.Done()
			return
		}

		c.logger.Log(ctx, log.LogLevelWarn, "error publishing, trying again in 10 seconds: "+err.Error(), nil)
		time.Sleep(10 * time.Second)
		c.sendDirectMessage(ctx, input, count+1)
	} else {
		c.logger.Log(ctx, log.LogLevelTrace, "message sent", nil)
		c.senderWG.Done()
	}
}

func (c *client) Wait() {
	c.senderWG.Wait()
}

Then in the main function, I call Wait() at the very end to ensure all sender goroutines are finished before the service gracefully shutdowns.

Apologies I am not sending a patch, my copy of your codebase diverged quite a bit, my usecase is different. Cheers!

@qhenkart
Copy link
Owner

@lzap I really appreciate you taking the time to come and create an issue with your findings

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants