Skip to content

Commit

Permalink
fix(admin): add retries for GroupCoordinator errors
Browse files Browse the repository at this point in the history
- retry admin operations that rely on talking to the group coordinator
  for a given ID if the coordinator has changed from the cached value,
  or is not available
- also fixup controller retry in ElectLeaders to use the err from
  response rather than sendAndReceive
- also rename isErrNotController to isRetriableControllerError for
  consistency

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Jan 5, 2025
1 parent 1358013 commit f9a6799
Showing 1 changed file with 122 additions and 70 deletions.
192 changes: 122 additions & 70 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,19 @@ func (ca *clusterAdmin) refreshController() (*Broker, error) {
return ca.client.RefreshController()
}

// isErrNotController returns `true` if the given error type unwraps to an
// isRetriableControllerError returns `true` if the given error type unwraps to an
// `ErrNotController` response from Kafka
func isErrNotController(err error) bool {
func isRetriableControllerError(err error) bool {
return errors.Is(err, ErrNotController)
}

// isRetriableGroupCoordinatorError returns `true` if the given error type
// unwraps to an `ErrNotCoordinatorForConsumer` response from Kafka or to an
// `ErrConsumerCoordinatorNotAvailable` response
func isRetriableGroupCoordinatorError(err error) bool {
return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable)
}

// retryOnError will repeatedly call the given (error-returning) func in the
// case that its response is non-nil and retryable (as determined by the
// provided retryable func) up to the maximum number of tries permitted by
Expand Down Expand Up @@ -252,7 +259,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
request.Version = 1
}

return ca.retryOnError(isErrNotController, func() error {
return ca.retryOnError(isRetriableControllerError, func() error {
b, err := ca.Controller()
if err != nil {
return err
Expand All @@ -269,7 +276,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
}

if !errors.Is(topicErr.Err, ErrNoError) {
if errors.Is(topicErr.Err, ErrNotController) {
if isRetriableControllerError(topicErr.Err) {
_, _ = ca.refreshController()
}
return topicErr
Expand All @@ -281,14 +288,17 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO

func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
var response *MetadataResponse
err = ca.retryOnError(isErrNotController, func() error {
err = ca.retryOnError(isRetriableControllerError, func() error {
controller, err := ca.Controller()
if err != nil {
return err
}
request := NewMetadataRequest(ca.conf.Version, topics)
response, err = controller.GetMetadata(request)
if isErrNotController(err) {
if err != nil {
return err
}
if isRetriableControllerError(err) {
_, _ = ca.refreshController()
}
return err
Expand All @@ -301,15 +311,15 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada

func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
var response *MetadataResponse
err = ca.retryOnError(isErrNotController, func() error {
err = ca.retryOnError(isRetriableControllerError, func() error {
controller, err := ca.Controller()
if err != nil {
return err
}

request := NewMetadataRequest(ca.conf.Version, nil)
response, err = controller.GetMetadata(request)
if isErrNotController(err) {
if isRetriableControllerError(err) {
_, _ = ca.refreshController()
}
return err
Expand Down Expand Up @@ -441,7 +451,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
request.Version = 1
}

return ca.retryOnError(isErrNotController, func() error {
return ca.retryOnError(isRetriableControllerError, func() error {
b, err := ca.Controller()
if err != nil {
return err
Expand Down Expand Up @@ -485,7 +495,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
request.Version = 1
}

return ca.retryOnError(isErrNotController, func() error {
return ca.retryOnError(isRetriableControllerError, func() error {
b, err := ca.Controller()
if err != nil {
return err
Expand Down Expand Up @@ -526,7 +536,7 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][
request.AddBlock(topic, int32(i), assignment[i])
}

return ca.retryOnError(isErrNotController, func() error {
return ca.retryOnError(isRetriableControllerError, func() error {
b, err := ca.Controller()
if err != nil {
return err
Expand Down Expand Up @@ -573,15 +583,15 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in
request.AddBlock(topic, partitions)

var rsp *ListPartitionReassignmentsResponse
err = ca.retryOnError(isErrNotController, func() error {
err = ca.retryOnError(isRetriableControllerError, func() error {
b, err := ca.Controller()
if err != nil {
return err
}
_ = b.Open(ca.client.Config())

rsp, err = b.ListPartitionReassignments(request)
if isErrNotController(err) {
if isRetriableControllerError(err) {
_, _ = ca.refreshController()
}
return err
Expand Down Expand Up @@ -924,20 +934,25 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s
}

var res *ElectLeadersResponse
err := ca.retryOnError(isErrNotController, func() error {
if err := ca.retryOnError(isRetriableControllerError, func() error {
b, err := ca.Controller()
if err != nil {
return err
}
_ = b.Open(ca.client.Config())

res, err = b.ElectLeaders(request)
if isErrNotController(err) {
_, _ = ca.refreshController()
if err != nil {
return err
}
return err
})
if err != nil {
if !errors.Is(res.ErrorCode, ErrNoError) {
if isRetriableControllerError(res.ErrorCode) {
_, _ = ca.refreshController()
}
return res.ErrorCode
}
return nil
}); err != nil {
return nil, err
}
return res.ReplicaElectionResults, nil
Expand All @@ -947,11 +962,11 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group
groupsPerBroker := make(map[*Broker][]string)

for _, group := range groups {
controller, err := ca.client.Coordinator(group)
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return nil, err
}
groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group)
}

for broker, brokerGroups := range groupsPerBroker {
Expand Down Expand Up @@ -1043,72 +1058,96 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
}

func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return nil, err
}

var response *OffsetFetchResponse
request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)

return coordinator.FetchOffset(request)
err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}
response, err = coordinator.FetchOffset(request)
if err != nil {
return err
}
if !errors.Is(response.Err, ErrNoError) {
if isRetriableGroupCoordinatorError(response.Err) {
_ = ca.client.RefreshCoordinator(group)
}
return response.Err
}
return nil
})
return response, err
}

func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

var response *DeleteOffsetsResponse
request := &DeleteOffsetsRequest{
Group: group,
partitions: map[string][]int32{
topic: {partition},
},
}

resp, err := coordinator.DeleteOffsets(request)
if err != nil {
return err
}
return ca.retryOnError(isRetriableGroupCoordinatorError, func() error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

if !errors.Is(resp.ErrorCode, ErrNoError) {
return resp.ErrorCode
}
response, err = coordinator.DeleteOffsets(request)
if err != nil {
return err
}

if !errors.Is(resp.Errors[topic][partition], ErrNoError) {
return resp.Errors[topic][partition]
}
return nil
if !errors.Is(response.ErrorCode, ErrNoError) {
if isRetriableGroupCoordinatorError(response.ErrorCode) {
_ = ca.client.RefreshCoordinator(group)
}
return response.ErrorCode
}

if !errors.Is(response.Errors[topic][partition], ErrNoError) {
return response.Errors[topic][partition]
}

return nil
})
}

func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

var response *DeleteGroupsResponse
request := &DeleteGroupsRequest{
Groups: []string{group},
}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}

resp, err := coordinator.DeleteGroups(request)
if err != nil {
return err
}
return ca.retryOnError(isRetriableGroupCoordinatorError, func() error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}
response, err = coordinator.DeleteGroups(request)
if err != nil {
return err
}

groupErr, ok := resp.GroupErrorCodes[group]
if !ok {
return ErrIncompleteResponse
}
groupErr, ok := response.GroupErrorCodes[group]
if !ok {
return ErrIncompleteResponse
}

if !errors.Is(groupErr, ErrNoError) {
return groupErr
}
if !errors.Is(groupErr, ErrNoError) {
if isRetriableGroupCoordinatorError(groupErr) {
_ = ca.client.RefreshCoordinator(group)
}
return groupErr
}

return nil
return nil
})
}

func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
Expand Down Expand Up @@ -1206,7 +1245,7 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU
}

var rsp *AlterUserScramCredentialsResponse
err := ca.retryOnError(isErrNotController, func() error {
err := ca.retryOnError(isRetriableControllerError, func() error {
b, err := ca.Controller()
if err != nil {
return err
Expand Down Expand Up @@ -1284,24 +1323,37 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie
return nil
}

func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
}

controller, err := ca.client.Coordinator(groupId)
if err != nil {
return nil, err
}
var response *LeaveGroupResponse
request := &LeaveGroupRequest{
Version: 3,
GroupId: groupId,
GroupId: group,
}
for _, instanceId := range groupInstanceIds {
groupInstanceId := instanceId
request.Members = append(request.Members, MemberIdentity{
GroupInstanceId: &groupInstanceId,
})
}
return controller.LeaveGroup(request)
err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}
response, err = coordinator.LeaveGroup(request)
if err != nil {
return err
}
if !errors.Is(response.Err, ErrNoError) {
if isRetriableGroupCoordinatorError(response.Err) {
_ = ca.client.RefreshCoordinator(group)
}
return response.Err
}
return nil
})
return response, err
}

0 comments on commit f9a6799

Please sign in to comment.