diff --git a/internal/conn/conn.go b/internal/conn/conn.go index e0e6793a4..7fe18cfcf 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -432,6 +432,7 @@ func (c *conn) NewStream( s := &grpcClientStream{ parentConn: c, + stream: nil, streamCtx: ctx, streamCancel: cancel, wrapping: useWrapping, diff --git a/internal/credentials/oauth2.go b/internal/credentials/oauth2.go index f49826127..a001836cb 100644 --- a/internal/credentials/oauth2.go +++ b/internal/credentials/oauth2.go @@ -179,12 +179,14 @@ func (tokenSource *tokenSourceOption) ApplyOauth2CredentialsOption(c *oauth2Toke func WithSubjectToken(subjectToken TokenSource) *tokenSourceOption { return &tokenSourceOption{ source: subjectToken, + createFunc: nil, tokenSourceType: SubjectTokenSourceType, } } func WithFixedSubjectToken(token, tokenType string) *tokenSourceOption { return &tokenSourceOption{ + source: nil, createFunc: func() (TokenSource, error) { return NewFixedTokenSource(token, tokenType), nil }, @@ -194,6 +196,7 @@ func WithFixedSubjectToken(token, tokenType string) *tokenSourceOption { func WithJWTSubjectToken(opts ...JWTTokenSourceOption) *tokenSourceOption { return &tokenSourceOption{ + source: nil, createFunc: func() (TokenSource, error) { return NewJWTTokenSource(opts...) }, @@ -205,12 +208,14 @@ func WithJWTSubjectToken(opts ...JWTTokenSourceOption) *tokenSourceOption { func WithActorToken(actorToken TokenSource) *tokenSourceOption { return &tokenSourceOption{ source: actorToken, + createFunc: nil, tokenSourceType: ActorTokenSourceType, } } func WithFixedActorToken(token, tokenType string) *tokenSourceOption { return &tokenSourceOption{ + source: nil, createFunc: func() (TokenSource, error) { return NewFixedTokenSource(token, tokenType), nil }, @@ -220,6 +225,7 @@ func WithFixedActorToken(token, tokenType string) *tokenSourceOption { func WithJWTActorToken(opts ...JWTTokenSourceOption) *tokenSourceOption { return &tokenSourceOption{ + source: nil, createFunc: func() (TokenSource, error) { return NewJWTTokenSource(opts...) }, @@ -265,10 +271,21 @@ func NewOauth2TokenExchangeCredentials( opts ...Oauth2TokenExchangeCredentialsOption, ) (*oauth2TokenExchange, error) { c := &oauth2TokenExchange{ - grantType: "urn:ietf:params:oauth:grant-type:token-exchange", - requestedTokenType: "urn:ietf:params:oauth:token-type:access_token", - requestTimeout: defaultRequestTimeout, - sourceInfo: stack.Record(1), + tokenEndpoint: "", + grantType: "urn:ietf:params:oauth:grant-type:token-exchange", + resource: "", + audience: nil, + scope: nil, + requestedTokenType: "urn:ietf:params:oauth:token-type:access_token", + subjectTokenSource: nil, + actorTokenSource: nil, + requestTimeout: defaultRequestTimeout, + receivedToken: "", + updateTokenTime: time.Time{}, + receivedTokenExpireTime: time.Time{}, + mutex: sync.RWMutex{}, + updating: atomic.Bool{}, + sourceInfo: stack.Record(1), } var err error @@ -452,8 +469,10 @@ func (provider *oauth2TokenExchange) exchangeToken(ctx context.Context, now time req.Close = true client := http.Client{ - Transport: http.DefaultTransport, - Timeout: provider.requestTimeout, + Transport: http.DefaultTransport, + CheckRedirect: nil, + Jar: nil, + Timeout: provider.requestTimeout, } result, err := client.Do(req) @@ -756,7 +775,14 @@ func WithRSAPrivateKeyPEMFile(path string) *rsaPrivateKeyPemFileOption { func NewJWTTokenSource(opts ...JWTTokenSourceOption) (*jwtTokenSource, error) { s := &jwtTokenSource{ - tokenTTL: defaultJWTTokenTTL, + signingMethod: nil, + keyID: "", + privateKey: nil, + issuer: "", + subject: "", + audience: nil, + id: "", + tokenTTL: defaultJWTTokenTTL, } var err error @@ -801,6 +827,8 @@ func (s *jwtTokenSource) Token() (Token, error) { err error ) t := jwt.Token{ + Raw: "", + Method: s.signingMethod, Header: map[string]interface{}{ "typ": "JWT", "alg": s.signingMethod.Alg(), @@ -812,9 +840,11 @@ func (s *jwtTokenSource) Token() (Token, error) { IssuedAt: issued, Audience: s.audience, ExpiresAt: expire, + NotBefore: nil, ID: s.id, }, - Method: s.signingMethod, + Signature: "", + Valid: false, } var token Token diff --git a/internal/table/session.go b/internal/table/session.go index db660143e..9e1675f2d 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -664,6 +664,7 @@ func renameTables( operationCancelAfter, operation.ModeSync, ), + Tables: nil, } for _, opt := range opts { if opt != nil { diff --git a/internal/topic/topicwriterinternal/queue.go b/internal/topic/topicwriterinternal/queue.go index f7d097a18..69f872f46 100644 --- a/internal/topic/topicwriterinternal/queue.go +++ b/internal/topic/topicwriterinternal/queue.go @@ -51,18 +51,19 @@ type messageQueue struct { func newMessageQueue() messageQueue { return messageQueue{ - messagesByOrder: make(map[int]messageWithDataContent), - seqNoToOrderID: make(map[int64]int), - hasNewMessages: make(empty.Chan, 1), - closedChan: make(empty.Chan), - lastSeqNo: -1, - OnAckReceived: nil, - closedErr: nil, - acksReceivedEvent: xsync.EventBroadcast{}, - m: xsync.RWMutex{RWMutex: sync.RWMutex{}}, - closed: false, - lastWrittenIndex: 0, - lastSentIndex: 0, + messagesByOrder: make(map[int]messageWithDataContent), + seqNoToOrderID: make(map[int64]int), + hasNewMessages: make(empty.Chan, 1), + closedChan: make(empty.Chan), + lastSeqNo: -1, + OnAckReceived: nil, + closedErr: nil, + acksReceivedEvent: xsync.EventBroadcast{}, + m: xsync.RWMutex{RWMutex: sync.RWMutex{}}, + stopReceiveMessagesReason: nil, + closed: false, + lastWrittenIndex: 0, + lastSentIndex: 0, } } diff --git a/internal/xcontext/cancels_quard.go b/internal/xcontext/cancels_quard.go index b2743e656..e9514b99d 100644 --- a/internal/xcontext/cancels_quard.go +++ b/internal/xcontext/cancels_quard.go @@ -14,6 +14,7 @@ type ( func NewCancelsGuard() *CancelsGuard { return &CancelsGuard{ + mu: sync.Mutex{}, cancels: make(map[*context.CancelFunc]struct{}), } } diff --git a/internal/xsql/connector.go b/internal/xsql/connector.go index 5124ed1d2..237ebea0d 100644 --- a/internal/xsql/connector.go +++ b/internal/xsql/connector.go @@ -229,6 +229,7 @@ func Open(parent ydbDriver, opts ...ConnectorOption) (_ *Connector, err error) { pathNormalizer: bind.TablePathPrefix(parent.Name()), trace: nil, traceRetry: nil, + retryBudget: nil, } for _, opt := range opts { if opt != nil { diff --git a/internal/xsync/last_usage_guard.go b/internal/xsync/last_usage_guard.go index f11d5ba49..6a30bb73e 100644 --- a/internal/xsync/last_usage_guard.go +++ b/internal/xsync/last_usage_guard.go @@ -29,6 +29,8 @@ func WithClock(clock clockwork.Clock) lastUsageOption { func NewLastUsage(opts ...lastUsageOption) *lastUsage { lastUsage := &lastUsage{ + locks: atomic.Int64{}, + t: atomic.Pointer[time.Time]{}, clock: clockwork.NewRealClock(), } for _, opt := range opts { diff --git a/log/driver.go b/log/driver.go index 57a95484d..db66137a2 100644 --- a/log/driver.go +++ b/log/driver.go @@ -503,5 +503,6 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver { //nolint:gocyclo OnPoolRelease: nil, OnConnPark: nil, OnBalancerClusterDiscoveryAttempt: nil, + OnConnStreamFinish: nil, } } diff --git a/retry/budget/budget.go b/retry/budget/budget.go index 2f2625178..94d78e9e3 100644 --- a/retry/budget/budget.go +++ b/retry/budget/budget.go @@ -39,8 +39,10 @@ func withFixedBudgetClock(clock clockwork.Clock) fixedBudgetOption { // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental func Limited(attemptsPerSecond int, opts ...fixedBudgetOption) *fixedBudget { q := &fixedBudget{ - clock: clockwork.NewRealClock(), - done: make(chan struct{}), + clock: clockwork.NewRealClock(), + ticker: nil, + quota: nil, + done: make(chan struct{}), } for _, opt := range opts { opt(q) diff --git a/retry/retry.go b/retry/retry.go index 1a2304689..b5b8cdaab 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -126,7 +126,7 @@ func WithTrace(t *trace.Retry) traceOption { return traceOption{t: t} } -var _ Option = budgetOption{} +var _ Option = budgetOption{b: nil} type budgetOption struct { b budget.Budget