Skip to content

Commit

Permalink
added alerting for quiz kyc; added code infra for getting config from…
Browse files Browse the repository at this point in the history
… a remote url, for social kycs
  • Loading branch information
ice-ares committed Jan 16, 2024
1 parent 10afbee commit 29c78a1
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 9 deletions.
4 changes: 4 additions & 0 deletions application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ wintr/connectors/storage/v2: &db
replicaURLs:
- postgresql://root:pass@localhost:5433/eskimo
kyc/social:
config-json-url: https://somewhere.com/something/somebogus.json
environment: local
enable-alerts: false
alert-slack-webhook: https://hooks.slack.com/services/dummy/dummy/dummy
Expand Down Expand Up @@ -68,6 +69,9 @@ kyc/social:
6: https://www.facebook.com/reel/1463272597773681
allow-long-live-tokens: true
kyc/quiz:
environment: local
enable-alerts: false
alert-slack-webhook: https://hooks.slack.com/services/dummy/dummy/dummy
wintr/connectors/storage/v2: *db
maxSessionDurationSeconds: 600
maxQuestionsPerSession: 3
Expand Down
15 changes: 15 additions & 0 deletions kyc/quiz/DDL.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ create table if not exists failed_quiz_sessions
primary key (user_id, started_at)
);

CREATE INDEX IF NOT EXISTS failed_quiz_sessions_lookup1_ix ON failed_quiz_sessions (ended_at DESC);

create table if not exists quiz_sessions
(
started_at timestamp not null,
Expand All @@ -33,3 +35,16 @@ create table if not exists quiz_sessions
user_id text primary key references users (id) ON DELETE CASCADE,
language text not null
);

CREATE INDEX IF NOT EXISTS quiz_sessions_lookup1_ix ON quiz_sessions (ended_successfully,ended_at DESC NULLS LAST);

CREATE TABLE IF NOT EXISTS quiz_alerts (
last_alert_at timestamp NOT NULL,
frequency_in_seconds bigint NOT NULL DEFAULT 300,
pk smallint NOT NULL PRIMARY KEY)
WITH (FILLFACTOR = 70);

insert into quiz_alerts (last_alert_at, pk)
VALUES (current_timestamp, 1)
ON CONFLICT (pk)
DO NOTHING;
14 changes: 10 additions & 4 deletions kyc/quiz/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
_ "embed"
"io"
"mime/multipart"
"sync/atomic"
stdlibtime "time"

"github.com/ice-blockchain/eskimo/users"
"github.com/ice-blockchain/wintr/connectors/storage/v2"
Expand Down Expand Up @@ -104,9 +106,13 @@ type (
config
}
config struct {
MaxSessionDurationSeconds int `yaml:"maxSessionDurationSeconds"`
MaxQuestionsPerSession int `yaml:"maxQuestionsPerSession"`
MaxWrongAnswersPerSession int `yaml:"maxWrongAnswersPerSession"`
SessionCoolDownSeconds int `yaml:"sessionCoolDownSeconds"`
alertFrequency *atomic.Pointer[stdlibtime.Duration]
Environment string `yaml:"environment" mapstructure:"environment"`
AlertSlackWebhook string `yaml:"alert-slack-webhook" mapstructure:"alert-slack-webhook"` //nolint:tagliatelle // .
MaxSessionDurationSeconds int `yaml:"maxSessionDurationSeconds"`
MaxQuestionsPerSession int `yaml:"maxQuestionsPerSession"`
MaxWrongAnswersPerSession int `yaml:"maxWrongAnswersPerSession"`
SessionCoolDownSeconds int `yaml:"sessionCoolDownSeconds"`
EnableAlerts bool `yaml:"enable-alerts" mapstructure:"enable-alerts"` //nolint:tagliatelle // .
}
)
18 changes: 13 additions & 5 deletions kyc/quiz/quiz.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package quiz
import (
"context"
"fmt"
"sync/atomic"
stdlibtime "time"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -33,6 +34,10 @@ func mustLoadConfig() config {
panic("session_cool_down_seconds is not set")
}

defaultAlertFrequency := alertFrequency
cfg.alertFrequency = new(atomic.Pointer[stdlibtime.Duration])
cfg.alertFrequency.Store(&defaultAlertFrequency)

return cfg
}

Expand All @@ -45,7 +50,10 @@ func newError(msg string) error {
}

func NewRepository(ctx context.Context, userRepo UserRepository) Repository {
return newRepositoryImpl(ctx, userRepo)
repo := newRepositoryImpl(ctx, userRepo)
go repo.startAlerter(ctx)

return repo
}

func newRepositoryImpl(ctx context.Context, userRepo UserRepository) *repositoryImpl {
Expand Down Expand Up @@ -97,14 +105,14 @@ func (r *repositoryImpl) validateKycStep(user *users.User) error {
return nil
}

func (*repositoryImpl) addFailedAttempt(ctx context.Context, userID UserID, now *time.Time, tx storage.Execer) error {
func (*repositoryImpl) addFailedAttempt(ctx context.Context, userID UserID, now *time.Time, tx storage.Execer, skipped bool) error {
// $1: user_id.
// $2: now.
const stmt = `
insert into failed_quiz_sessions (started_at, ended_at, questions, answers, language, user_id, skipped)
values ($2, $2, '{}', '{}', 'en', $1, true)
values ($2, $2, '{}', '{}', 'en', $1, $2)
`
_, err := storage.Exec(ctx, tx, stmt, userID, now.Time)
_, err := storage.Exec(ctx, tx, stmt, userID, now.Time, skipped)

return errors.Wrap(err, "failed to add failed attempt")
}
Expand Down Expand Up @@ -135,7 +143,7 @@ func (r *repositoryImpl) SkipQuizSession(ctx context.Context, userID UserID) err
}](ctx, tx, stmt, userID)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
err = r.addFailedAttempt(ctx, userID, now, tx)
err = r.addFailedAttempt(ctx, userID, now, tx, true)
if err == nil {
err = r.modifyUser(ctx, false, now, userID)
}
Expand Down
167 changes: 167 additions & 0 deletions kyc/quiz/slack_alerts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// SPDX-License-Identifier: ice License 1.0

package quiz

import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
stdlibtime "time"

"github.com/goccy/go-json"
"github.com/pkg/errors"

"github.com/ice-blockchain/wintr/connectors/storage/v2"
"github.com/ice-blockchain/wintr/log"
"github.com/ice-blockchain/wintr/time"
)

const (
alertFrequency = 5 * stdlibtime.Minute
)

func (r *repositoryImpl) startAlerter(ctx context.Context) {
if !r.config.EnableAlerts {
log.Info("unsuccessfulKYCSteps alerts not enabled")

return
} else if r.config.AlertSlackWebhook == "" || r.config.Environment == "" {
log.Panic("`alert-slack-webhook` is missing")
}
ticker := stdlibtime.NewTicker(*r.config.alertFrequency.Load())
defer ticker.Stop()

for {
select {
case <-ticker.C:
const deadline = stdlibtime.Minute
reqCtx, cancel := context.WithTimeout(ctx, deadline)
log.Error(errors.Wrap(r.sendAlertToSlack(reqCtx, ticker), "failed to sendAlertToSlack"))
cancel()
case <-ctx.Done():
return
}
}
}

//nolint:funlen // .
func (r *repositoryImpl) sendAlertToSlack(ctx context.Context, ticker *stdlibtime.Ticker) error {
return storage.DoInTransaction(ctx, r.DB, func(conn storage.QueryExecer) error { //nolint:wrapcheck // Not needed.
sql := `SELECT last_alert_at,
frequency_in_seconds
FROM quiz_alerts
WHERE pk = 1
FOR UPDATE`
alert, err := storage.Get[struct {
LastAlertAt *time.Time `db:"last_alert_at"`
FrequencyInSeconds uint64 `db:"frequency_in_seconds"`
}](ctx, conn, sql)
if err != nil {
return errors.Wrap(err, "failed to lock quiz_alerts")
}
if time.Now().Sub(*alert.LastAlertAt.Time) < stdlibtime.Duration(float64(r.config.alertFrequency.Load().Nanoseconds())*0.8) { //nolint:gomnd // .
return nil
}
if newFrequency := stdlibtime.Duration(alert.FrequencyInSeconds) * stdlibtime.Second; newFrequency != *r.config.alertFrequency.Load() {
r.config.alertFrequency.Store(&newFrequency)
ticker.Reset(newFrequency)
}

sql = `SELECT (CASE
WHEN skipped
THEN 'skipped'
WHEN EXTRACT(EPOCH FROM (ended_at - started_at)) >= $2
THEN 'expired'
WHEN cardinality(answers) < $3
THEN 'failed'
ELSE 'unknown'
END) AS mapped_reason,
count(1) AS counter
FROM failed_quiz_sessions
WHERE ended_at >= $1
GROUP BY 1
UNION ALL
SELECT 'success' AS mapped_reason,
count(1) AS counter
FROM quiz_sessions
WHERE ended_successfully IS TRUE
AND ended_at IS NOT NULL
AND ended_at >= $1`
stats, err := storage.Select[quizStats](ctx, conn, sql, alert.LastAlertAt.Time, r.config.MaxSessionDurationSeconds, r.config.MaxQuestionsPerSession-r.config.MaxWrongAnswersPerSession) //nolint:lll // .
if err != nil {
return errors.Wrap(err, "failed to select stats")
}

if err = r.sendSlackMessage(ctx, stats); err != nil {
return errors.Wrap(err, "failed to sendSlackMessage")
}

sql = `UPDATE quiz_alerts
SET last_alert_at = $1
WHERE pk = 1`
updatedRows, err := storage.Exec(ctx, conn, sql, time.Now().Time)
if err != nil {
return errors.Wrap(err, "update last_alert_at to now failed")
}
if updatedRows == 0 {
return errors.New("unexpected 0 updatedRows")
}

return nil
})
}

type (
quizStats struct {
Reason string `db:"mapped_reason" json:"reason"`
Counter uint64 `db:"counter" json:"counter"`
}
)

//nolint:funlen // .
func (r *repositoryImpl) sendSlackMessage(ctx context.Context, stats []*quizStats) error {
if len(stats) == 0 {
return nil
}
rows := make([]string, 0, len(stats))
var hasFailures bool
for _, stat := range stats {
if stat.Reason != "success" && stat.Counter > 0 {
hasFailures = true
}
if stat.Counter == 0 {
continue
}
rows = append(rows, fmt.Sprintf("`%v`: `%v`", stat.Reason, stat.Counter))
}
if !hasFailures || len(rows) == 0 {
return nil
}
message := struct {
Text string `json:"text,omitempty"`
}{
Text: fmt.Sprintf("[%v]kycStep [4], stats:\n%v", r.config.Environment, strings.Join(rows, "\n")),
}
data, err := json.Marshal(message)
if err != nil {
return errors.Wrapf(err, "failed to Marshal slack message:%#v", message)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.config.AlertSlackWebhook, bytes.NewBuffer(data))
if err != nil {
return errors.Wrap(err, "newRequestWithContext failed")
}

resp, err := new(http.Client).Do(req)
if err != nil {
return errors.Wrap(err, "slack webhook request failed")
}
if resp.StatusCode != http.StatusOK {
return errors.Errorf("unexpected statusCode:%v", resp.StatusCode)
}

return errors.Wrap(resp.Body.Close(), "failed to close body")
}
16 changes: 16 additions & 0 deletions kyc/social/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"mime/multipart"
"sync"
"sync/atomic"
"text/template"
stdlibtime "time"

Expand Down Expand Up @@ -79,6 +80,8 @@ type (

const (
applicationYamlKey = "kyc/social"

requestDeadline = 25 * stdlibtime.Second
)

const (
Expand Down Expand Up @@ -114,11 +117,24 @@ type (
cfg *config
db *storage.DB
}

kycConfigJSON struct {
Social1KYC struct{} `json:"social1-kyc"` //nolint:tagliatelle // .
Social2KYC struct{} `json:"social2-kyc"` //nolint:tagliatelle // .
WebSocial1KYC struct{} `json:"web-social1-kyc"` //nolint:tagliatelle // .
WebSocial2KYC struct{} `json:"web-social2-kyc"` //nolint:tagliatelle // .
DynamicDistributionSocialKYC []*struct {
KYCStep users.KYCStep `json:"step"` //nolint:tagliatelle // .
} `json:"dynamic-distribution-kyc"` //nolint:tagliatelle // .
}

config struct {
alertFrequency *sync.Map // .map[users.KYCStep]stdlibtime.Duration.
kycConfigJSON *atomic.Pointer[kycConfigJSON]
SocialLinks map[Type]struct {
PostURLs map[users.KYCStep]string `yaml:"post-urls" mapstructure:"post-urls"` //nolint:tagliatelle // .
} `yaml:"social-links" mapstructure:"social-links"` //nolint:tagliatelle // .
ConfigJSONURL string `yaml:"config-json-url" mapstructure:"config-json-url"` //nolint:tagliatelle // .
Environment string `yaml:"environment" mapstructure:"environment"`
AlertSlackWebhook string `yaml:"alert-slack-webhook" mapstructure:"alert-slack-webhook"` //nolint:tagliatelle // .
DelayBetweenSessions stdlibtime.Duration `yaml:"delay-between-sessions" mapstructure:"delay-between-sessions"` //nolint:tagliatelle // .
Expand Down
Loading

0 comments on commit 29c78a1

Please sign in to comment.