From 1864588826de8e200c455fbeee136804992c33b0 Mon Sep 17 00:00:00 2001 From: Cronus <105345303+ice-cronus@users.noreply.github.com> Date: Wed, 26 Jun 2024 20:08:10 +0700 Subject: [PATCH] make face load balancer more atomic, not to get spikes when face free (#164) --- cmd/eskimo-hut/api/docs.go | 6 ++++ cmd/eskimo-hut/api/swagger.json | 6 ++++ cmd/eskimo-hut/api/swagger.yaml | 4 +++ cmd/eskimo-hut/kyc.go | 1 + kyc/face/face.go | 13 ++++---- kyc/face/internal/contract.go | 2 +- kyc/face/internal/threedivi/contract.go | 6 ++-- kyc/face/internal/threedivi/threedivi.go | 38 +++++++++++++++++++----- 8 files changed, 57 insertions(+), 19 deletions(-) diff --git a/cmd/eskimo-hut/api/docs.go b/cmd/eskimo-hut/api/docs.go index 8050be06..df389225 100644 --- a/cmd/eskimo-hut/api/docs.go +++ b/cmd/eskimo-hut/api/docs.go @@ -1321,6 +1321,12 @@ const docTemplate = `{ "description": "the kyc steps you wish to skip", "name": "skipKYCSteps", "in": "query" + }, + { + "type": "integer", + "description": "the kyc step which would be next", + "name": "nextKYCStep", + "in": "query" } ], "responses": { diff --git a/cmd/eskimo-hut/api/swagger.json b/cmd/eskimo-hut/api/swagger.json index 520052b2..26e8a769 100644 --- a/cmd/eskimo-hut/api/swagger.json +++ b/cmd/eskimo-hut/api/swagger.json @@ -1314,6 +1314,12 @@ "description": "the kyc steps you wish to skip", "name": "skipKYCSteps", "in": "query" + }, + { + "type": "integer", + "description": "the kyc step which would be next", + "name": "nextKYCStep", + "in": "query" } ], "responses": { diff --git a/cmd/eskimo-hut/api/swagger.yaml b/cmd/eskimo-hut/api/swagger.yaml index 522fb40d..01a849d3 100644 --- a/cmd/eskimo-hut/api/swagger.yaml +++ b/cmd/eskimo-hut/api/swagger.yaml @@ -1663,6 +1663,10 @@ paths: type: integer name: skipKYCSteps type: array + - description: the kyc step which would be next + in: query + name: nextKYCStep + type: integer produces: - application/json responses: diff --git a/cmd/eskimo-hut/kyc.go b/cmd/eskimo-hut/kyc.go index 8c415d89..aaab83f6 100644 --- a/cmd/eskimo-hut/kyc.go +++ b/cmd/eskimo-hut/kyc.go @@ -245,6 +245,7 @@ func validateVerifySocialKYCStep(req *server.Request[kycsocial.VerificationMetad // @Param X-Account-Metadata header string false "Insert your metadata token" default() // @Param userId path string true "ID of the user" // @Param skipKYCSteps query []int false "the kyc steps you wish to skip" collectionFormat(multi) +// @Param nextKYCStep query int false "the kyc step which would be next" // @Success 200 {object} User // @Failure 400 {object} server.ErrorResponse "if validations fail" // @Failure 401 {object} server.ErrorResponse "if not authorized" diff --git a/kyc/face/face.go b/kyc/face/face.go index 0f4e6b45..a5179914 100644 --- a/kyc/face/face.go +++ b/kyc/face/face.go @@ -24,7 +24,7 @@ func New(ctx context.Context, usersRep UserRepository) Client { cfg.UnexpectedErrorsAllowed = 5 } db := storage.MustConnect(ctx, ddl, applicationYamlKey) - cl := &client{client: threedivi.New3Divi(usersRep, &cfg.ThreeDiVi), cfg: cfg, db: db} + cl := &client{client: threedivi.New3Divi(ctx, usersRep, &cfg.ThreeDiVi), cfg: cfg, db: db} go cl.clearErrs(ctx) return cl @@ -56,19 +56,16 @@ func (c *client) CheckStatus(ctx context.Context, user *users.User, nextKYCStep } } } - //nolint:nestif // . if !hasResult || nextKYCStep == users.LivenessDetectionKYCStep { - availabilityErr := c.client.Available(ctx) + availabilityErr := c.client.Available(ctx, userWasPreviouslyForwardedToFaceKYC) if availabilityErr == nil { kycFaceAvailable = true if fErr := c.saveUserForwarded(ctx, user.ID); fErr != nil { return false, errors.Wrapf(fErr, "failed ") } - } else { - if !errors.Is(availabilityErr, internal.ErrNotAvailable) { - c.unexpectedErrors.Add(1) - } - log.Error(errors.Wrapf(err, "[unexpected]face auth is unavailable for userID %v KYCStep %v", user.ID, nextKYCStep)) + } else if !errors.Is(availabilityErr, internal.ErrNotAvailable) { + c.unexpectedErrors.Add(1) + log.Error(errors.Wrapf(availabilityErr, "[unexpected]face auth is unavailable for userID %v KYCStep %v", user.ID, nextKYCStep)) } } diff --git a/kyc/face/internal/contract.go b/kyc/face/internal/contract.go index ad4a22c8..5e40a18b 100644 --- a/kyc/face/internal/contract.go +++ b/kyc/face/internal/contract.go @@ -13,7 +13,7 @@ import ( type ( Client interface { - Available(ctx context.Context) error + Available(ctx context.Context, userWasPreviouslyForwardedToFaceKYC bool) error CheckAndUpdateStatus(ctx context.Context, user *users.User) (hasFaceKYCResult bool, err error) Reset(ctx context.Context, user *users.User, fetchState bool) error } diff --git a/kyc/face/internal/threedivi/contract.go b/kyc/face/internal/threedivi/contract.go index dc7b6e3d..67adfcb4 100644 --- a/kyc/face/internal/threedivi/contract.go +++ b/kyc/face/internal/threedivi/contract.go @@ -3,6 +3,7 @@ package threedivi import ( + "sync/atomic" stdlibtime "time" "github.com/pkg/errors" @@ -12,8 +13,9 @@ import ( type ( threeDivi struct { - users internal.UserRepository - cfg *Config + users internal.UserRepository + cfg *Config + loadBalancedUsersCount atomic.Uint64 } Config struct { ThreeDiVi struct { diff --git a/kyc/face/internal/threedivi/threedivi.go b/kyc/face/internal/threedivi/threedivi.go index 04aea6b9..9ad75f65 100644 --- a/kyc/face/internal/threedivi/threedivi.go +++ b/kyc/face/internal/threedivi/threedivi.go @@ -28,7 +28,7 @@ func init() { //nolint:gochecknoinits // It's the only way to tweak the client. req.DefaultClient().GetClient().Timeout = requestDeadline } -func New3Divi(usersRepository internal.UserRepository, cfg *Config) internal.Client { +func New3Divi(ctx context.Context, usersRepository internal.UserRepository, cfg *Config) internal.Client { if cfg.ThreeDiVi.BAFHost == "" { log.Panic(errors.Errorf("no baf-host for 3divi integration")) } @@ -40,13 +40,16 @@ func New3Divi(usersRepository internal.UserRepository, cfg *Config) internal.Cli } cfg.ThreeDiVi.BAFHost, _ = strings.CutSuffix(cfg.ThreeDiVi.BAFHost, "/") - return &threeDivi{ + tdv := &threeDivi{ users: usersRepository, cfg: cfg, } + go tdv.clearUsers(ctx) + + return tdv } -func (t *threeDivi) Available(ctx context.Context) error { +func (t *threeDivi) Available(ctx context.Context, userWasPreviouslyForwardedToFaceKYC bool) error { if t.cfg.ThreeDiVi.AvailabilityURL == "" { return nil } @@ -74,20 +77,39 @@ func (t *threeDivi) Available(ctx context.Context) error { } else if data, err2 := resp.ToBytes(); err2 != nil { return errors.Wrapf(err2, "failed to read body of availability of face auth") } else { //nolint:revive // . - return t.isAvailable(data) + return t.isAvailable(data, userWasPreviouslyForwardedToFaceKYC) } } -func (t *threeDivi) isAvailable(data []byte) error { +//nolint:revive // . +func (t *threeDivi) isAvailable(data []byte, userWasPreviouslyForwardedToFaceKYC bool) error { activeUsers, cErr := t.activeUsers(data) if cErr != nil { return errors.Wrapf(cErr, "failed to parse metrics of availability of face auth") } - if activeUsers+1 > t.cfg.ThreeDiVi.ConcurrentUsers { - return internal.ErrNotAvailable + if int64(t.cfg.ThreeDiVi.ConcurrentUsers)-(int64(activeUsers)+int64(t.loadBalancedUsersCount.Load())) >= 1 { + if !userWasPreviouslyForwardedToFaceKYC { + t.loadBalancedUsersCount.Add(1) + } + + return nil } - return nil + return internal.ErrNotAvailable +} + +func (t *threeDivi) clearUsers(ctx context.Context) { + ticker := stdlibtime.NewTicker(1 * stdlibtime.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + t.loadBalancedUsersCount.Store(0) + case <-ctx.Done(): + return + } + } } func (*threeDivi) activeUsers(data []byte) (int, error) {