Skip to content

Commit

Permalink
Use backoff to initialize connectors on the background
Browse files Browse the repository at this point in the history
Signed-off-by: m.nabokikh <[email protected]>
  • Loading branch information
nabokihms committed Jan 15, 2024
1 parent 665a5b6 commit 9a55acc
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 5 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/Masterminds/semver v1.5.0
github.com/Masterminds/sprig/v3 v3.2.3
github.com/beevik/etree v1.3.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/coreos/go-oidc/v3 v3.9.0
github.com/dexidp/dex/api/v2 v2.1.0
github.com/felixge/httpsnoop v1.0.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/beevik/etree v1.3.0 h1:hQTc+pylzIKDb23yYprodCWWTt+ojFfUZyzU09a/hmU=
github.com/beevik/etree v1.3.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zVZWFc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
71 changes: 66 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"container/list"
"context"
"crypto/rsa"
"encoding/json"
Expand All @@ -19,6 +20,8 @@ import (
"time"

gosundheit "github.com/AppsFlyer/go-sundheit"
"github.com/AppsFlyer/go-sundheit/checks"
"github.com/cenkalti/backoff/v4"
"github.com/felixge/httpsnoop"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -316,11 +319,7 @@ func newServer(ctx context.Context, c Config, rotationStrategy rotationStrategy)
return nil, errors.New("server: no connectors specified")
}

for _, conn := range storageConnectors {
if _, err := s.OpenConnector(conn); err != nil {
return nil, fmt.Errorf("server: Failed to open connector %s: %v", conn.ID, err)
}
}
s.InitializeConnectors(storageConnectors)

instrumentHandlerCounter := func(_ string, handler http.Handler) http.HandlerFunc {
return handler.ServeHTTP
Expand All @@ -345,6 +344,28 @@ func newServer(ctx context.Context, c Config, rotationStrategy rotationStrategy)
}
}

if c.HealthChecker != nil {
err = c.HealthChecker.RegisterCheck(
&checks.CustomCheck{
CheckName: "connectors",
CheckFunc: func(context.Context) (details interface{}, err error) {
s.mu.Lock()
connectorsLen := len(s.connectors)
s.mu.Unlock()

if connectorsLen > 0 {
return nil, nil
}
return nil, errors.New("no connectors configured")
},
},
gosundheit.ExecutionPeriod(500*time.Millisecond),
)
if err != nil {
return nil, fmt.Errorf("failed to register healthcheck: %v", err)
}
}

r := mux.NewRouter().SkipClean(true).UseEncodedPath()
handle := func(p string, h http.Handler) {
r.Handle(path.Join(issuerURL.Path, p), instrumentHandlerCounter(p, h))
Expand Down Expand Up @@ -658,3 +679,43 @@ func (s *Server) getConnector(id string) (Connector, error) {

return conn, nil
}

// InitializeConnectors opens all connectors in the storage and adds them to the server.
// If a connector fails to open, it will be retried until it succeeds.
//
// This method prevents dex from failing to start if a connector is temporarily unavailable.
func (s *Server) InitializeConnectors(connectors []storage.Connector) {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 0
b.InitialInterval = 100 * time.Millisecond
b.MaxInterval = 30 * time.Second

limiter := backoff.NewTicker(b)

s.logger.Info("start initializing connectors")

queue := list.New()
for _, c := range connectors {
queue.PushBack(c)
}

go func() {
for queue.Len() > 0 {
conn := queue.Remove(queue.Front()).(storage.Connector)

s.logger.Debugf("initializing %q connector", conn.ID)

_, err := s.OpenConnector(conn)
if err == nil {
s.logger.Debugf("connector %q has been initialized successfully", conn.ID)
continue
}
s.logger.Errorf("failed to open connector: %v", err)

queue.PushBack(conn)
<-limiter.C // Wait for the next retry only on fails
}

s.logger.Info("all connectors have been initialized")
}()
}
3 changes: 3 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,10 @@ func TestOAuth2CodeFlow(t *testing.T) {
})
defer httpServer.Close()

s.mu.Lock()
mockConn := s.connectors["mock"]
s.mu.Unlock()

conn = mockConn.Connector.(*mock.Callback)

// Query server's provider metadata.
Expand Down

0 comments on commit 9a55acc

Please sign in to comment.