Skip to content

Commit

Permalink
alive ttl for email queue (#208)
Browse files Browse the repository at this point in the history
Option for the FE to call same endpoint to mark that user is active and
still in queue, once TTL is expired (no events from FE) - no email is
sent to the user.
By default TTL is email expiration time, so it process all the queue
regardless on extra endpoint calls
  • Loading branch information
ice-cronus authored Aug 8, 2024
1 parent 10d20d3 commit 9939d44
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 26 deletions.
8 changes: 5 additions & 3 deletions auth/email_link/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const (

duplicatedSignInRequestsInLessThan = 2 * stdlibtime.Second
loginQueueKey = "login_queue"
loginQueueTTLKey = "login_queue_ttl"
loginRateLimitKey = "login_rate_limit"
initEmailRateLimit = "1000:1m"
)
Expand Down Expand Up @@ -123,9 +124,10 @@ type (
ConfirmationCode struct {
MaxWrongAttemptsCount int64 `yaml:"maxWrongAttemptsCount"`
} `yaml:"confirmationCode"`
DisableEmailSending bool `yaml:"disableEmailSending"`
QueueProcessing bool `yaml:"queueProcessing"`
ExtraLoadBalancersCount int `yaml:"extraLoadBalancersCount"`
DisableEmailSending bool `yaml:"disableEmailSending"`
QueueProcessing bool `yaml:"queueProcessing"`
QueueAliveTTL stdlibtime.Duration `yaml:"queueAliveTTL" mapstructure:"queueAliveTTL"` //nolint:tagliatelle // .
ExtraLoadBalancersCount int `yaml:"extraLoadBalancersCount"`
}
loginID struct {
Email string `json:"email,omitempty" example:"[email protected]"`
Expand Down
124 changes: 112 additions & 12 deletions auth/email_link/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,48 @@ import (
"github.com/ice-blockchain/wintr/time"
)

//nolint:funlen // .
//nolint:funlen,gocognit,revive // .
func (c *client) enqueueLoginAttempt(ctx context.Context, now *time.Time, userEmail string) (queuePosition int64, rateLimit string, err error) {
aliveTTLEnabled := c.cfg.QueueAliveTTL > 0
var result []redis.Cmder
result, err = c.queueDB.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error {
if zErr := pipeliner.ZAddNX(ctx, loginQueueKey, redis.Z{
Score: float64(now.Nanosecond()),
Score: float64(now.UnixNano()),
Member: userEmail,
}).Err(); zErr != nil {
return zErr //nolint:wrapcheck // .
}
if zRankErr := pipeliner.ZRank(ctx, loginQueueKey, userEmail).Err(); zRankErr != nil {
return zRankErr //nolint:wrapcheck // .
}
if aliveTTLEnabled {
if zErr := pipeliner.ZAdd(ctx, loginQueueTTLKey, redis.Z{
Score: float64(now.UnixNano()),
Member: userEmail,
}).Err(); zErr != nil {
return zErr //nolint:wrapcheck // .
}
}

return pipeliner.Get(ctx, loginRateLimitKey).Err()
})
if err != nil {
return 0, "", errors.Wrapf(err, "failed to enqueue email")
}
errs := make([]error, 0, len(result))
for idx := 2; idx >= 0; idx-- {
rateLimitIdx := 3
if !aliveTTLEnabled {
rateLimitIdx = 2
}
for idx := rateLimitIdx; idx >= 0; idx-- {
cmdRes := result[idx]
if cmdRes.Err() != nil {
errs = append(errs, errors.Wrapf(cmdRes.Err(), "failed to enqueue email because of failed %v", cmdRes.String()))

continue
}
switch idx {
case 2: //nolint:gomnd // Index in pipeline.
case rateLimitIdx:
strCmd := cmdRes.(*redis.StringCmd) //nolint:errcheck,forcetypeassert // .
rateLimit = strCmd.Val()
case 1:
Expand All @@ -69,7 +82,7 @@ func (c *client) enqueueLoginAttempt(ctx context.Context, now *time.Time, userEm
return queuePosition, rateLimit, nil
}

//nolint:funlen,gocognit,revive,contextcheck // Keep processing in signle place.
//nolint:funlen,gocognit,revive,contextcheck,gocyclo,cyclop // Keep processing in signle place.
func (c *client) processEmailQueue(rootCtx context.Context) {
lastProcessed := time.Now()

Expand Down Expand Up @@ -121,21 +134,40 @@ func (c *client) processEmailQueue(rootCtx context.Context) {

rlCount, rlDuration, rlErr := parseRateLimit(rateLimit)
if rlErr != nil {
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores), "failed to rollback emails %#v back to queue", emails))
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores, nil), "failed to rollback emails %#v back to queue", emails))
log.Panic(errors.Wrapf(rlErr, "failed to parse rate limit for email queue %v", rateLimit)) //nolint:revive // .
}
limit := int(math.Min(float64(rlCount), float64(len(emails))))
if rlCount < len(emails) {
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails[rlCount:], scores), "failed to rollback emails %#v back to queue cuz rate limit %v is less than batch %v", emails[rlCount:], rlCount, email.MaxBatchSize)) //nolint:lll // .
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails[rlCount:], scores, nil), "failed to rollback emails %#v back to queue cuz rate limit %v is less than batch %v", emails[rlCount:], rlCount, email.MaxBatchSize)) //nolint:lll // .
emails = emails[:rlCount]
}
var ttls map[string]int64
if c.cfg.QueueAliveTTL > 0 {
reqCtx, reqCancel = context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // .
ttls, err = c.filterEmailsWithAliveTTL(reqCtx, now, &emails, scores) //nolint:contextcheck // Background context.
if err != nil {
log.Error(errors.Wrapf(err, "failed to fetch TTLs for emails"))
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores, nil), "failed to rollback emails %#v back to queue", emails))
reqCancel()
_ = wait(rootCtx, 1*stdlibtime.Second) //nolint:errcheck // Noting to rollback.

continue
}
reqCancel()

if len(emails) == 0 {
_ = wait(rootCtx, 1*stdlibtime.Second) //nolint:errcheck // Nothing to rollback.

continue
}
}
reqCtx, reqCancel = context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // .
loginInformation, err := c.fetchLoginInformationForEmailBatch(reqCtx, now, emails, limit)
if err != nil {
log.Error(errors.Wrapf(err, "failed to fetch login information for emails: %v", emails))
reqCancel()
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores), "failed to rollback emails %#v back to queue", emails))
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores, ttls), "failed to rollback emails %#v back to queue", emails))
_ = wait(rootCtx, 1*stdlibtime.Second) //nolint:errcheck // Already rolled back.

continue
Expand All @@ -146,7 +178,7 @@ func (c *client) processEmailQueue(rootCtx context.Context) {
if rateLimitEstimationDuration < rlDuration {
oneBatchProcessingTimeToRespectRateLimit := stdlibtime.Duration(int64(len(emails))/int64(rlCount)) * rlDuration
if wait(rootCtx, oneBatchProcessingTimeToRespectRateLimit) != nil {
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores), "failed to rollback fetched emails %#v back to queue", emails))
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores, ttls), "failed to rollback fetched emails %#v back to queue", emails))

continue
}
Expand All @@ -155,7 +187,7 @@ func (c *client) processEmailQueue(rootCtx context.Context) {
if failed, sErr := c.sendEmails(reqCtx, loginInformation); sErr != nil {
reqCancel()
log.Error(errors.Wrapf(sErr, "failed to send email batch for emails %#v", failed))
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(failed, scores), "failed to rollback failed emails %#v back to queue", failed))
log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(failed, scores, ttls), "failed to rollback failed emails %#v back to queue", failed))
stdlibtime.Sleep(1 * stdlibtime.Second)

continue
Expand All @@ -165,18 +197,36 @@ func (c *client) processEmailQueue(rootCtx context.Context) {
}
}

func (c *client) rollbackEmailsBackToQueue(failed []string, scores map[string]int64) error {
func (c *client) rollbackEmailsBackToQueue(failed []string, scores, ttls map[string]int64) error {
if len(failed) == 0 {
return nil
}
rollbackCtx, rollbackCancel := context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // .
defer rollbackCancel()
failedZ := make([]redis.Z, 0, len(failed))
failedTTLs := make([]redis.Z, 0, len(failed))
for _, failedEmail := range failed {
failedZ = append(failedZ, redis.Z{
Score: float64(scores[failedEmail]),
Member: failedEmail,
})
if len(ttls) > 0 {
failedTTLs = append(failedTTLs, redis.Z{
Score: float64(ttls[failedEmail]),
Member: failedEmail,
})
}
}
mErr := multierror.Append(
errors.Wrapf(c.queueDB.ZAddNX(rollbackCtx, loginQueueKey, failedZ...).Err(), "failed to rollback unsent emails %#v", failed),
)
if len(failedTTLs) > 0 {
mErr = multierror.Append(mErr,
errors.Wrapf(c.queueDB.ZAddNX(rollbackCtx, loginQueueTTLKey, failedTTLs...).Err(), "failed to rollback TTL for unsent emails %#v", failed),
)
}

return errors.Wrapf(c.queueDB.ZAddNX(rollbackCtx, loginQueueKey, failedZ...).Err(), "failed to rollback unsent emails %#v", failed)
return mErr.ErrorOrNil() //nolint:wrapcheck // .
}

//nolint:gocritic,revive // We need all the results from the pipeline
Expand Down Expand Up @@ -212,6 +262,56 @@ func (c *client) dequeueNextEmails(ctx context.Context) (emailsBatch []string, s
return emailsBatch, scores, rate, nil
}

//nolint:funlen,gocognit,revive // .
func (c *client) filterEmailsWithAliveTTL(ctx context.Context, now *time.Time, emails *[]string, scores map[string]int64) (ttls map[string]int64, err error) {
ttls = make(map[string]int64, 0)
if len(*emails) == 0 {
return map[string]int64{}, nil
}
pipeRes, err := c.queueDB.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error {
if ttlBatchErr := pipeliner.ZMScore(ctx, loginQueueTTLKey, (*emails)...).Err(); ttlBatchErr != nil {
return ttlBatchErr //nolint:wrapcheck // .
}
interfaceSlice := make([]any, 0, len(*emails))
for _, m := range *emails {
interfaceSlice = append(interfaceSlice, m)
}

return pipeliner.ZRem(ctx, loginQueueTTLKey, interfaceSlice...).Err()
})
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch email ttls")
}
if ttlErr := pipeRes[0].Err(); ttlErr != nil {
return nil, errors.Wrapf(ttlErr, "failed to fetch %v email ttl", pipeRes[0].String())
}
if len(pipeRes) > 1 {
if remErr := pipeRes[1].Err(); remErr != nil {
return nil, errors.Wrapf(remErr, "failed to del ttl %v email ttl", pipeRes[1].String())
}
}
ttlBatch := pipeRes[0].(*redis.FloatSliceCmd).Val() //nolint:forcetypeassert // .
if len(ttlBatch) == 0 {
return ttls, nil
}
for idx := len(*emails) - 1; idx >= 0; idx-- {
if ttlBatch[idx] == 0 {
continue
}
ttlTime := stdlibtime.Unix(0, int64(ttlBatch[idx])).Add(c.cfg.QueueAliveTTL)
userLeftQueue := now.After(ttlTime)
if userLeftQueue {
delete(scores, (*emails)[idx])
*emails = append((*emails)[:idx], (*emails)[idx+1:]...)

continue
}
ttls[(*emails)[idx]] = int64(ttlBatch[idx])
}

return ttls, nil
}

func (c *client) fetchLoginInformationForEmailBatch(ctx context.Context, now *time.Time, emails []string, limit int) ([]*emailLinkSignIn, error) {
sql := fmt.Sprintf(`
SELECT * FROM public.email_link_sign_ins
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (

require (
cloud.google.com/go v0.115.0 // indirect
cloud.google.com/go/auth v0.7.3 // indirect
cloud.google.com/go/auth v0.8.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.3 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/firestore v1.16.0 // indirect
Expand Down Expand Up @@ -135,7 +135,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo/v2 v2.19.1 // indirect
github.com/onsi/ginkgo/v2 v2.20.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.13 // indirect
Expand Down Expand Up @@ -195,7 +195,7 @@ require (
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.24.0 // indirect
google.golang.org/api v0.190.0 // indirect
google.golang.org/api v0.191.0 // indirect
google.golang.org/appengine/v2 v2.0.6 // indirect
google.golang.org/genproto v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.115.0 h1:CnFSK6Xo3lDYRoBKEcAtia6VSC837/ZkJuRduSFnr14=
cloud.google.com/go v0.115.0/go.mod h1:8jIM5vVgoAEoiVxQ/O4BFTfHqulPZgs/ufEzMcFMdWU=
cloud.google.com/go/auth v0.7.3 h1:98Vr+5jMaCZ5NZk6e/uBgf60phTk/XN84r8QEWB9yjY=
cloud.google.com/go/auth v0.7.3/go.mod h1:HJtWUx1P5eqjy/f6Iq5KeytNpbAcGolPhOgyop2LlzA=
cloud.google.com/go/auth v0.8.0 h1:y8jUJLl/Fg+qNBWxP/Hox2ezJvjkrPb952PC1p0G6A4=
cloud.google.com/go/auth v0.8.0/go.mod h1:qGVp/Y3kDRSDZ5gFD/XPUfYQ9xW1iI7q8RIRoCyBbJc=
cloud.google.com/go/auth/oauth2adapt v0.2.3 h1:MlxF+Pd3OmSudg/b1yZ5lJwoXCEaeedAguodky1PcKI=
cloud.google.com/go/auth/oauth2adapt v0.2.3/go.mod h1:tMQXOfZzFuNuUxOypHlQEXgdfX5cuhwU+ffUuXRJE8I=
cloud.google.com/go/compute/metadata v0.5.0 h1:Zr0eK8JbFv6+Wi4ilXAR8FJ3wyNdpxHKJNPos6LTZOY=
Expand Down Expand Up @@ -393,10 +393,10 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0=
github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA=
github.com/onsi/gomega v1.34.0 h1:eSSPsPNp6ZpsG8X1OVmOTxig+CblTc4AxpPBykhe2Os=
github.com/onsi/gomega v1.34.0/go.mod h1:MIKI8c+f+QLWk+hxbePD4i0LMJSExPaZOVfkoex4cAo=
github.com/onsi/ginkgo/v2 v2.20.0 h1:PE84V2mHqoT1sglvHc8ZdQtPcwmvvt29WLEEO3xmdZw=
github.com/onsi/ginkgo/v2 v2.20.0/go.mod h1:lG9ey2Z29hR41WMVthyJBGUBcBhGOtoPF2VFMvBXFCI=
github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k=
github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
Expand Down Expand Up @@ -659,8 +659,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.190.0 h1:ASM+IhLY1zljNdLu19W1jTmU6A+gMk6M46Wlur61s+Q=
google.golang.org/api v0.190.0/go.mod h1:QIr6I9iedBLnfqoD6L6Vze1UvS5Hzj5r2aUBOaZnLHo=
google.golang.org/api v0.191.0 h1:cJcF09Z+4HAB2t5qTQM1ZtfL/PemsLFkcFG67qq2afk=
google.golang.org/api v0.191.0/go.mod h1:tD5dsFGxFza0hnQveGfVk9QQYKcfp+VzgRqyXFxE0+E=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine/v2 v2.0.6 h1:LvPZLGuchSBslPBp+LAhihBeGSiRh1myRoYK4NtuBIw=
Expand Down

0 comments on commit 9939d44

Please sign in to comment.