Skip to content

Commit

Permalink
Add cleanup routines for underlying http transport
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstucki committed Dec 19, 2024
1 parent 113b964 commit 61c99de
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 2 deletions.
29 changes: 27 additions & 2 deletions rpadmin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type AdminAPI struct {
urls []string
brokerIDToUrlsMutex sync.Mutex
brokerIDToUrls map[int]string
transport *http.Transport
retryClient *pester.Client
oneshotClient *http.Client
auth Auth
Expand Down Expand Up @@ -180,18 +181,19 @@ func newAdminAPI(urls []string, auth Auth, tlsConfig *tls.Config, dialer DialCon
for _, opt := range opts {
opt.apply(client)
}
transport := defaultTransport()

a := &AdminAPI{
urls: make([]string, len(urls)),
retryClient: client,
oneshotClient: &http.Client{Timeout: 10 * time.Second},
auth: auth,
transport: transport,
tlsConfig: tlsConfig,
brokerIDToUrls: make(map[int]string),
forCloud: forCloud,
}

transport := &http.Transport{}

if tlsConfig != nil {
transport.TLSClientConfig = tlsConfig
}
Expand Down Expand Up @@ -781,3 +783,26 @@ func (a *AdminAPI) UpdateAPIUrlsFromKubernetesDNS() error {

return a.initURLs(urls, a.tlsConfig, a.forCloud)
}

// Close closes all idle connections of the underlying transport
// this should be called when an admin client is no longer in-use
// in order to not leak connections from the underlying transport
// pool.
func (a *AdminAPI) Close() {
a.transport.CloseIdleConnections()
}

func defaultTransport() *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
}
70 changes: 70 additions & 0 deletions rpadmin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,73 @@ func TestUpdateAPIUrlsFromKubernetesDNS(t *testing.T) {
})
}
}

func TestIdleConnectionClosure(t *testing.T) {
clients := 1000
numRequests := 10

urls := []string{}
for id := 0; id < 3; id++ {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasPrefix(r.URL.Path, "/v1/node_config"):
w.Write([]byte(fmt.Sprintf(`{"node_id": %d}`, id))) //nolint:gocritic // original rpk code
case strings.HasPrefix(r.URL.Path, "/v1/partitions/redpanda/controller/0"):
w.Write([]byte(`{"leader_id": 0}`)) //nolint:gocritic // original rpk code

Check failure on line 394 in rpadmin/admin_test.go

View workflow job for this annotation

GitHub Actions / lint

directive `//nolint:gocritic // original rpk code` is unused for linter "gocritic" (nolintlint)
}
}))

t.Cleanup(server.Close)

urls = append(urls, server.URL)
}

// tracker to make sure we close all of our connections
var mutex sync.RWMutex
conns := []*wrappedConnection{}

for i := 0; i < clients; i++ {
// initialize a new client and do some requests
adminClient, err := NewAdminAPIWithDialer(urls, new(NopAuth), nil, func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := (&net.Dialer{}).DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
mutex.Lock()
defer mutex.Unlock()

wrapped := &wrappedConnection{Conn: conn}
conns = append(conns, wrapped)
return wrapped, nil
})
require.NoError(t, err)

for i := 0; i < numRequests; i++ {
_, err = adminClient.GetLeaderID(context.Background())
require.NoError(t, err)
}

adminClient.Close()
}

mutex.RLock()
defer mutex.RUnlock()

closed := 0
for _, conn := range conns {
if conn.closed {
closed++
}
}
require.Equal(t, closed, len(conns), "Not all connections were closed")
}

type wrappedConnection struct {
net.Conn
closed bool
}

func (w *wrappedConnection) Close() error {
w.closed = true
return w.Conn.Close()
}

0 comments on commit 61c99de

Please sign in to comment.