From 6431f492b960b19566fedd5e6af4c448d184f5a3 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Tue, 7 Jan 2025 09:48:59 +0000 Subject: [PATCH] fix(admin): use named return on retryOnError func Use a named return for the defer func to inspect to avoid subtle bugs Signed-off-by: Dominic Evans --- admin.go | 64 ++++++++++++++++++++++++++------------------------------ 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/admin.go b/admin.go index 6f39b5b01..8aa1f374e 100644 --- a/admin.go +++ b/admin.go @@ -1066,25 +1066,24 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) { var response *OffsetFetchResponse request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions) - err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { defer func() { if err != nil && isRetriableGroupCoordinatorError(err) { _ = ca.client.RefreshCoordinator(group) } }() + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + response, err = coordinator.FetchOffset(request) if err != nil { return err } if !errors.Is(response.Err, ErrNoError) { - err = response.Err - return err + return response.Err } return nil @@ -1102,29 +1101,27 @@ func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, pa }, } - return ca.retryOnError(isRetriableGroupCoordinatorError, func() error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { defer func() { if err != nil && isRetriableGroupCoordinatorError(err) { _ = ca.client.RefreshCoordinator(group) } }() + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + response, err = coordinator.DeleteOffsets(request) if err != nil { return err } if !errors.Is(response.ErrorCode, ErrNoError) { - err = response.ErrorCode - return err + return response.ErrorCode } if !errors.Is(response.Errors[topic][partition], ErrNoError) { - err = response.Errors[topic][partition] - return err + return response.Errors[topic][partition] } return nil @@ -1140,18 +1137,18 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { request.Version = 1 } - return ca.retryOnError(isRetriableGroupCoordinatorError, func() error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { defer func() { if err != nil && isRetriableGroupCoordinatorError(err) { _ = ca.client.RefreshCoordinator(group) } }() + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + response, err = coordinator.DeleteGroups(request) if err != nil { return err @@ -1163,8 +1160,7 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { } if !errors.Is(groupErr, ErrNoError) { - err = groupErr - return err + return groupErr } return nil @@ -1359,28 +1355,28 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanc GroupInstanceId: &groupInstanceId, }) } - err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { defer func() { if err != nil && isRetriableGroupCoordinatorError(err) { _ = ca.client.RefreshCoordinator(group) } }() + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + response, err = coordinator.LeaveGroup(request) if err != nil { return err } if !errors.Is(response.Err, ErrNoError) { - err = response.Err - return err + return response.Err } return nil }) + return response, err }