Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Delegation Token APIs to ClusterAdmin #2736

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions acl_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
AclOperationDescribeConfigs
AclOperationAlterConfigs
AclOperationIdempotentWrite
AclOperationCreateTokens
AclOperationDescribeTokens
)

func (a *AclOperation) String() string {
Expand All @@ -47,6 +49,8 @@ func (a *AclOperation) String() string {
AclOperationDescribeConfigs: "DescribeConfigs",
AclOperationAlterConfigs: "AlterConfigs",
AclOperationIdempotentWrite: "IdempotentWrite",
AclOperationCreateTokens: "CreateTokens",
AclOperationDescribeTokens: "DescribeTokens",
}
s, ok := mapping[*a]
if !ok {
Expand Down Expand Up @@ -77,6 +81,8 @@ func (a *AclOperation) UnmarshalText(text []byte) error {
"describeconfigs": AclOperationDescribeConfigs,
"alterconfigs": AclOperationAlterConfigs,
"idempotentwrite": AclOperationIdempotentWrite,
"createtokens": AclOperationCreateTokens,
"describetokens": AclOperationDescribeTokens,
}
ao, ok := mapping[normalized]
if !ok {
Expand Down Expand Up @@ -142,6 +148,7 @@ const (
AclResourceCluster
AclResourceTransactionalID
AclResourceDelegationToken
AclResourceUser
)

func (a *AclResourceType) String() string {
Expand All @@ -153,6 +160,7 @@ func (a *AclResourceType) String() string {
AclResourceCluster: "Cluster",
AclResourceTransactionalID: "TransactionalID",
AclResourceDelegationToken: "DelegationToken",
AclResourceUser: "User",
}
s, ok := mapping[*a]
if !ok {
Expand All @@ -177,6 +185,7 @@ func (a *AclResourceType) UnmarshalText(text []byte) error {
"cluster": AclResourceCluster,
"transactionalid": AclResourceTransactionalID,
"delegationtoken": AclResourceDelegationToken,
"user": AclResourceUser,
}

art, ok := mapping[normalized]
Expand Down
119 changes: 119 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@
// This is for static membership feature. KIP-345
RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)

// Creates a renewable delegation token based on the access privileges of the token's owner. If a nil argument
// is provided, the owner is assumed to be the admin client user. The renewer argument allows users other
// than the token's owner to renew/expire the token. When the maxLifetime argument is -1 milliseconds,
// the server default will be used.
CreateDelegationToken(renewers []string, owner *string, maxLifetime time.Duration) (*DelegationToken, error)

// Renews an existing delegation token (identified by its hmac), returning the new expiration time.
RenewDelegationToken(hmac []byte, period time.Duration) (time.Time, error)

// Changes the expiration time for an existing delegation token (identified by its hmac), returning the new
// expiration time. Set the period to -1 milliseconds to immediately expire a token.
ExpireDelegationToken(hmac []byte, period time.Duration) (time.Time, error)

// Returns the information for all delegation tokens owned by the users supplied in the argument.
DescribeDelegationToken(owners []string) ([]RenewableToken, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -1269,3 +1285,106 @@
}
return controller.LeaveGroup(request)
}

func (ca *clusterAdmin) CreateDelegationToken(renewers []string, owner *string, maxLifetime time.Duration) (*DelegationToken, error) {

Check failure on line 1289 in admin.go

View workflow job for this annotation

GitHub Actions / Linting with Go 1.22.x

unnecessary leading newline (whitespace)

controller, err := ca.client.Controller()
if err != nil {
return nil, err
}

resource := AclResourceUser
request := &CreateDelegationTokenRequest{
Version: 2,
MaxLifetime: maxLifetime,
Renewers: make([]Principal, len(renewers)),
}

if ca.conf.Version.IsAtLeast(V3_3_0_0) {
request.Version = 3
if owner != nil && len(*owner) > 0 {
user := resource.String()
request.OwnerPrincipalType = &user
request.OwnerName = owner
}
}

for i, r := range renewers {
request.Renewers[i] = Principal{resource.String(), r}
}

rsp, err := controller.CreateDelegationToken(request)
if err != nil {
return nil, err
}
if !errors.Is(rsp.ErrorCode, ErrNoError) {
return nil, rsp.ErrorCode
}

return &rsp.DelegationToken, nil
}

func (ca *clusterAdmin) RenewDelegationToken(hmac []byte, period time.Duration) (time.Time, error) {
var expiry time.Time
controller, err := ca.client.Controller()
if err != nil {
return expiry, err
}

request := RenewDelegationTokenRequest{Version: 2, HMAC: hmac, RenewalPeriod: period}
rsp, err := controller.RenewDelegationToken(&request)
if err != nil {
return expiry, err
}
if !errors.Is(rsp.ErrorCode, ErrNoError) {
return expiry, rsp.ErrorCode
}

return rsp.ExpiryTime, nil
}

func (ca *clusterAdmin) ExpireDelegationToken(hmac []byte, period time.Duration) (time.Time, error) {
var expiry time.Time

controller, err := ca.client.Controller()
if err != nil {
return expiry, err
}

request := ExpireDelegationTokenRequest{Version: 2, HMAC: hmac, ExpiryPeriod: period}
rsp, err := controller.ExpireDelegationToken(&request)
if err != nil {
return expiry, err
}
if !errors.Is(rsp.ErrorCode, ErrNoError) {
return expiry, rsp.ErrorCode
}

return rsp.ExpiryTime, nil
}

func (ca *clusterAdmin) DescribeDelegationToken(owners []string) ([]RenewableToken, error) {
var tokens []RenewableToken

controller, err := ca.client.Controller()
if err != nil {
return tokens, err
}

resource := AclResourceUser
principals := make([]Principal, len(owners))
for i, x := range owners {
principals[i] = Principal{resource.String(), x}
}

request := DescribeDelegationTokenRequest{Version: 2, Owners: principals}
rsp, err := controller.DescribeDelegationToken(&request)
if err != nil {
return tokens, err
}
if !errors.Is(rsp.ErrorCode, ErrNoError) {
return tokens, rsp.ErrorCode
}

return rsp.Tokens, nil
}
48 changes: 48 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,54 @@ func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterCli
return response, nil
}

func (b *Broker) CreateDelegationToken(request *CreateDelegationTokenRequest) (*CreateDelegationTokenResponse, error) {
response := new(CreateDelegationTokenResponse)
response.Version = request.version()

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) RenewDelegationToken(request *RenewDelegationTokenRequest) (*RenewDelegationTokenResponse, error) {
response := new(RenewDelegationTokenResponse)
response.Version = request.version()

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) ExpireDelegationToken(request *ExpireDelegationTokenRequest) (*ExpireDelegationTokenResponse, error) {
response := new(ExpireDelegationTokenResponse)
response.Version = request.version()

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) DescribeDelegationToken(request *DescribeDelegationTokenRequest) (*DescribeDelegationTokenResponse, error) {
response := new(DescribeDelegationTokenResponse)
response.Version = request.version()

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

// readFull ensures the conn ReadDeadline has been setup before making a
// call to io.ReadFull
func (b *Broker) readFull(buf []byte) (n int, err error) {
Expand Down
118 changes: 118 additions & 0 deletions delegation_token_create_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package sarama

import "time"

type CreateDelegationTokenRequest struct {
Version int16
OwnerPrincipalType *string
OwnerName *string
Renewers []Principal
MaxLifetime time.Duration
}

func (c *CreateDelegationTokenRequest) encode(pe packetEncoder) (err error) {
if c.Version > 2 {
if err = pe.putNullableCompactString(c.OwnerPrincipalType); err != nil {
return err
}
if err = pe.putNullableCompactString(c.OwnerName); err != nil {
return err
}
}

if c.Version > 1 {
pe.putCompactArrayLength(len(c.Renewers))
} else if err = pe.putArrayLength(len(c.Renewers)); err != nil {
return err
}

for _, r := range c.Renewers {
if err = r.encode(pe, c.Version); err != nil {
return err
}
if c.Version > 1 {
pe.putEmptyTaggedFieldArray()
}
}

pe.putInt64(c.MaxLifetime.Milliseconds())

if c.Version > 1 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (c *CreateDelegationTokenRequest) decode(pd packetDecoder, version int16) (err error) {
c.Version = version

if version > 2 {
if c.OwnerPrincipalType, err = pd.getCompactNullableString(); err != nil {
return err
}
if c.OwnerName, err = pd.getCompactNullableString(); err != nil {
return err
}
}

var n int
if version > 1 {
n, err = pd.getCompactArrayLength()
} else {
n, err = pd.getArrayLength()
}
if err != nil {
return err
}
c.Renewers = make([]Principal, n)
for i := range c.Renewers {
if err := c.Renewers[i].decode(pd, version); err != nil {
return err
}
if version > 1 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}

var ms int64
if ms, err = pd.getInt64(); err == nil {
c.MaxLifetime = time.Duration(ms) * time.Millisecond
}

if version > 1 && err == nil {
_, err = pd.getEmptyTaggedFieldArray()
}

return err
}

func (c *CreateDelegationTokenRequest) key() int16 {
return 38
}

func (c *CreateDelegationTokenRequest) version() int16 {
return c.Version
}

func (c *CreateDelegationTokenRequest) headerVersion() int16 {
if c.Version > 1 {
return 2
}
return 1
}

func (c *CreateDelegationTokenRequest) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 3
}

func (c *CreateDelegationTokenRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 3:
return V3_3_0_0
default:
return V1_1_0_0
}
}
Loading
Loading