Skip to content

Commit

Permalink
funlen refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleb Brozhe committed May 16, 2024
1 parent e409e01 commit 3d21f27
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 73 deletions.
6 changes: 0 additions & 6 deletions internal/credentials/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ func (c *Static) Token(ctx context.Context) (token string, err error) {
)
}
defer cc.Close()

client := Ydb_Auth_V1.NewAuthServiceClient(cc)

response, err := client.Login(ctx, &Ydb_Auth.LoginRequest{
OperationParams: &Ydb_Operations.OperationParams{
OperationMode: 0,
Expand All @@ -105,7 +103,6 @@ func (c *Static) Token(ctx context.Context) (token string, err error) {
if err != nil {
return "", xerrors.WithStackTrace(err)
}

switch {
case !response.GetOperation().GetReady():
return "", xerrors.WithStackTrace(
Expand All @@ -114,7 +111,6 @@ func (c *Static) Token(ctx context.Context) (token string, err error) {
response.GetOperation().GetIssues(),
),
)

case response.GetOperation().GetStatus() != Ydb.StatusIds_SUCCESS:
return "", xerrors.WithStackTrace(
xerrors.Operation(
Expand All @@ -127,12 +123,10 @@ func (c *Static) Token(ctx context.Context) (token string, err error) {
if err = response.GetOperation().GetResult().UnmarshalTo(&result); err != nil {
return "", xerrors.WithStackTrace(err)
}

expiresAt, err := parseExpiresAt(result.GetToken())
if err != nil {
return "", xerrors.WithStackTrace(err)
}

c.requestAt = time.Now().Add(time.Until(expiresAt) / TokenRefreshDivisor)
c.token = result.GetToken()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (s StreamReader) CloseSend() error {
return s.Stream.CloseSend()
}

//nolint:funlen
func (s StreamReader) Recv() (ServerMessage, error) {
grpcMess, err := s.Stream.Recv()
if xerrors.Is(err, io.EOF) {
Expand Down
143 changes: 80 additions & 63 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,10 @@ func New[PT Item[T], T any](
opts ...option[PT, T],
) *Pool[PT, T] {
p := &Pool[PT, T]{
trace: defaultTrace,
limit: DefaultLimit,
createItem: func(ctx context.Context) (PT, error) {
var item T

return &item, nil
},
done: make(chan struct{}),
trace: defaultTrace,
limit: DefaultLimit,
createItem: defaultCreateItem[T, PT],
done: make(chan struct{}),
}

for _, opt := range opts {
Expand All @@ -174,62 +170,38 @@ func New[PT Item[T], T any](
})
}()

createItem := p.createItem
p.createItem = createItemWithTimeoutHandling(p.createItem, p)

p.idle = make([]PT, 0, p.limit)
p.index = make(map[PT]struct{}, p.limit)
p.stats = &safeStats{
v: stats.Stats{Limit: p.limit},
onChange: p.trace.OnChange,
}

return p
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT Item[T]](ctx context.Context) (PT, error) {
var item T

return &item, nil
}

p.createItem = func(ctx context.Context) (PT, error) {
// createItemWithTimeoutHandling wraps the createItem function with timeout handling
func createItemWithTimeoutHandling[PT Item[T], T any](
createItem func(ctx context.Context) (PT, error),
p *Pool[PT, T],
) func(ctx context.Context) (PT, error) {
return func(ctx context.Context) (PT, error) {
var (
ch = make(chan PT)
createErr error
)
go func() {
defer close(ch)
createErr = func() error {
var (
createCtx = xcontext.ValueOnly(ctx)
cancelCreate context.CancelFunc
)
if d := p.createTimeout; d > 0 {
createCtx, cancelCreate = xcontext.WithTimeout(createCtx, d)
} else {
createCtx, cancelCreate = xcontext.WithCancel(createCtx)
}
defer cancelCreate()

newItem, err := createItem(createCtx)
if err != nil {
return xerrors.WithStackTrace(err)
}

needCloseItem := true
defer func() {
if needCloseItem {
_ = p.closeItem(ctx, newItem)
}
}()

select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)

case <-ctx.Done():
p.mu.Lock()
defer p.mu.Unlock()

if len(p.index) < p.limit {
p.idle = append(p.idle, newItem)
p.index[newItem] = struct{}{}
p.stats.Index().Inc()
needCloseItem = false
}

return xerrors.WithStackTrace(ctx.Err())

case ch <- newItem:
needCloseItem = false

return nil
}
}()
createErr = createItemWithContext(ctx, p, createItem, ch)
}()

select {
Expand All @@ -249,14 +221,59 @@ func New[PT Item[T], T any](
return item, nil
}
}
p.idle = make([]PT, 0, p.limit)
p.index = make(map[PT]struct{}, p.limit)
p.stats = &safeStats{
v: stats.Stats{Limit: p.limit},
onChange: p.trace.OnChange,
}

// createItemWithContext handles the creation of an item with context handling
func createItemWithContext[PT Item[T], T any](
ctx context.Context,
p *Pool[PT, T],
createItem func(ctx context.Context) (PT, error),
ch chan PT,
) error {
var (
createCtx = xcontext.ValueOnly(ctx)
cancelCreate context.CancelFunc
)

if d := p.createTimeout; d > 0 {
createCtx, cancelCreate = xcontext.WithTimeout(createCtx, d)
} else {
createCtx, cancelCreate = xcontext.WithCancel(createCtx)
}
defer cancelCreate()

return p
newItem, err := createItem(createCtx)
if err != nil {
return xerrors.WithStackTrace(err)
}

needCloseItem := true
defer func() {
if needCloseItem {
_ = p.closeItem(ctx, newItem)
}
}()

select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
p.mu.Lock()
defer p.mu.Unlock()

if len(p.index) < p.limit {
p.idle = append(p.idle, newItem)
p.index[newItem] = struct{}{}
p.stats.Index().Inc()
needCloseItem = false
}

return xerrors.WithStackTrace(ctx.Err())
case ch <- newItem:
needCloseItem = false

return nil
}
}

func (p *Pool[PT, T]) Stats() stats.Stats {
Expand Down
1 change: 1 addition & 0 deletions internal/scripting/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (c *Client) StreamExecute(
return r, xerrors.WithStackTrace(err)
}

//nolint:funlen
func (c *Client) streamExecute(
ctx context.Context,
query string,
Expand Down
1 change: 1 addition & 0 deletions log/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func Coordination(l Logger, d trace.Detailer, opts ...Option) (t trace.Coordinat
return internalCoordination(wrapLogger(l, opts...), d)
}

//nolint:funlen
func internalCoordination(
l *wrapper, //nolint:interfacer
d trace.Detailer,
Expand Down
3 changes: 2 additions & 1 deletion log/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ func Driver(l Logger, d trace.Detailer, opts ...Option) (t trace.Driver) {
return internalDriver(wrapLogger(l, opts...), d)
}

func internalDriver(l Logger, d trace.Detailer) trace.Driver { //nolint:gocyclo
//nolint:gocyclo,funlen
func internalDriver(l Logger, d trace.Detailer) trace.Driver {
return trace.Driver{
OnResolve: func(
info trace.DriverResolveStartInfo,
Expand Down
2 changes: 1 addition & 1 deletion log/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func Query(l Logger, d trace.Detailer, opts ...Option) (t trace.Query) {
return internalQuery(wrapLogger(l, opts...), d)
}

//nolint:gocyclo
//nolint:gocyclo,funlen
func internalQuery(
l *wrapper, //nolint:interfacer
d trace.Detailer,
Expand Down
1 change: 1 addition & 0 deletions log/scripting.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func Scripting(l Logger, d trace.Detailer, opts ...Option) (t trace.Scripting) {
return internalScripting(wrapLogger(l, opts...), d)
}

//nolint:funlen
func internalScripting(l *wrapper, d trace.Detailer) (t trace.Scripting) {
t.OnExecute = func(info trace.ScriptingExecuteStartInfo) func(trace.ScriptingExecuteDoneInfo) {
if d.Details()&trace.ScriptingEvents == 0 {
Expand Down
1 change: 1 addition & 0 deletions log/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func DatabaseSQL(l Logger, d trace.Detailer, opts ...Option) (t trace.DatabaseSQ
return internalDatabaseSQL(wrapLogger(l, opts...), d)
}

//nolint:funlen
func internalDatabaseSQL(l *wrapper, d trace.Detailer) (t trace.DatabaseSQL) {
t.OnConnectorConnect = func(
info trace.DatabaseSQLConnectorConnectStartInfo,
Expand Down
2 changes: 1 addition & 1 deletion log/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Table(l Logger, d trace.Detailer, opts ...Option) (t trace.Table) {
return internalTable(wrapLogger(l, opts...), d)
}

//nolint:gocyclo
//nolint:gocyclo,funlen
func internalTable(l *wrapper, d trace.Detailer) (t trace.Table) {
t.OnDo = func(
info trace.TableDoStartInfo,
Expand Down
3 changes: 2 additions & 1 deletion log/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ func Topic(l Logger, d trace.Detailer, opts ...Option) (t trace.Topic) {
return internalTopic(wrapLogger(l, opts...), d)
}

func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocyclo
//nolint:gocyclo,funlen
func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
t.OnReaderReconnect = func(
info trace.TopicReaderReconnectStartInfo,
) func(doneInfo trace.TopicReaderReconnectDoneInfo) {
Expand Down

0 comments on commit 3d21f27

Please sign in to comment.