Skip to content

Commit

Permalink
Initialize connectors in parallel
Browse files Browse the repository at this point in the history
This is required to initialize connectors faster and have a separate backoff limit.

Also in this PR:
* Wait for connectors initialization on test server startup
* Show only initialized connector on the login page

Signed-off-by: m.nabokikh <[email protected]>
  • Loading branch information
nabokihms committed Jan 16, 2024
1 parent 9a55acc commit 563e90a
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 49 deletions.
50 changes: 23 additions & 27 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,28 +135,35 @@ func (s *Server) handleAuthorization(w http.ResponseWriter, r *http.Request) {
return
}

connectorID := r.Form.Get("connector_id")
// Construct a URL with all of the arguments in its query
connURL := url.URL{
RawQuery: r.Form.Encode(),
}

connectors, err := s.storage.ListConnectors()
if err != nil {
s.logger.Errorf("Failed to get list of connectors: %v", err)
s.renderError(r, w, http.StatusInternalServerError, "Failed to retrieve connector list.")
return
s.mu.Lock()
connectorsLen := len(s.connectors)
connectorInfos := make([]connectorInfo, 0, connectorsLen)

for id, conn := range s.connectors {
connURL.Path = s.absPath("/auth", url.PathEscape(id))
connectorInfos = append(connectorInfos, connectorInfo{
ID: id,
Name: conn.Name,
Type: conn.Type,
URL: template.URL(connURL.String()),
})
}
s.mu.Unlock()

connectorID := r.Form.Get("connector_id")
// We don't need connector_id any more
r.Form.Del("connector_id")

// Construct a URL with all of the arguments in its query
connURL := url.URL{
RawQuery: r.Form.Encode(),
}

// Redirect if a client chooses a specific connector_id
if connectorID != "" {
for _, c := range connectors {
if c.ID == connectorID {
connURL.Path = s.absPath("/auth", url.PathEscape(c.ID))
for _, info := range connectorInfos {
if info.ID == connectorID {
connURL.Path = s.absPath("/auth", url.PathEscape(info.ID))
http.Redirect(w, r, connURL.String(), http.StatusFound)
return
}
Expand All @@ -165,22 +172,11 @@ func (s *Server) handleAuthorization(w http.ResponseWriter, r *http.Request) {
return
}

if len(connectors) == 1 && !s.alwaysShowLogin {
connURL.Path = s.absPath("/auth", url.PathEscape(connectors[0].ID))
if len(connectorInfos) == 1 && !s.alwaysShowLogin {
connURL.Path = s.absPath("/auth", url.PathEscape(connectorInfos[0].ID))
http.Redirect(w, r, connURL.String(), http.StatusFound)
}

connectorInfos := make([]connectorInfo, len(connectors))
for index, conn := range connectors {
connURL.Path = s.absPath("/auth", url.PathEscape(conn.ID))
connectorInfos[index] = connectorInfo{
ID: conn.ID,
Name: conn.Name,
Type: conn.Type,
URL: template.URL(connURL.String()),
}
}

if err := s.templates.login(r, w, connectorInfos); err != nil {
s.logger.Errorf("Server template error: %v", err)
}
Expand Down
40 changes: 18 additions & 22 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package server

import (
"container/list"
"context"
"crypto/rsa"
"encoding/json"
Expand Down Expand Up @@ -56,6 +55,8 @@ const LocalConnector = "local"

// Connector is a connector with resource version metadata.
type Connector struct {
Type string
Name string
ResourceVersion string
Connector connector.Connector
}
Expand Down Expand Up @@ -643,6 +644,8 @@ func (s *Server) OpenConnector(conn storage.Connector) (Connector, error) {
}

connector := Connector{
Type: conn.Type,
Name: conn.Name,
ResourceVersion: conn.ResourceVersion,
Connector: c,
}
Expand Down Expand Up @@ -687,35 +690,28 @@ func (s *Server) getConnector(id string) (Connector, error) {
func (s *Server) InitializeConnectors(connectors []storage.Connector) {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 0
b.InitialInterval = 100 * time.Millisecond
b.InitialInterval = 500 * time.Millisecond
b.MaxInterval = 30 * time.Second

limiter := backoff.NewTicker(b)

s.logger.Info("start initializing connectors")

queue := list.New()
for _, c := range connectors {
queue.PushBack(c)
}
go func(conn storage.Connector) {
limiter := backoff.NewTicker(b)

go func() {
for queue.Len() > 0 {
conn := queue.Remove(queue.Front()).(storage.Connector)
for {
s.logger.Debugf("initializing %q connector", conn.ID)

s.logger.Debugf("initializing %q connector", conn.ID)
_, err := s.OpenConnector(conn)
if err == nil {
break
}

_, err := s.OpenConnector(conn)
if err == nil {
s.logger.Debugf("connector %q has been initialized successfully", conn.ID)
continue
s.logger.Error(err)
<-limiter.C // Wait for the next retry only on fails
}
s.logger.Errorf("failed to open connector: %v", err)

queue.PushBack(conn)
<-limiter.C // Wait for the next retry only on fails
}

s.logger.Info("all connectors have been initialized")
}()
s.logger.Debugf("connector %q has been initialized successfully", conn.ID)
}(c)
}
}
23 changes: 23 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,27 @@ func newTestServer(ctx context.Context, t *testing.T, updateConfig func(c *Confi
server.refreshTokenPolicy.now = config.Now
}

// Wait for connectors initialization
ticker := time.NewTicker(time.Millisecond)

attempts := 0
for {
attempts++
if attempts > 1000 {
t.Fatal("failed waiting for connectors initialization")
}
<-ticker.C

server.mu.Lock()
containersLen := len(server.connectors)
server.mu.Unlock()

if containersLen > 0 {
break
}
}
ticker.Stop()

return s, server
}

Expand Down Expand Up @@ -1622,7 +1643,9 @@ func TestOAuth2DeviceFlow(t *testing.T) {
})
defer httpServer.Close()

s.mu.Lock()
mockConn := s.connectors["mock"]
s.mu.Unlock()
conn = mockConn.Connector.(*mock.Callback)

p, err := oidc.NewProvider(ctx, httpServer.URL)
Expand Down

0 comments on commit 563e90a

Please sign in to comment.