Skip to content

Commit

Permalink
Merge pull request #1232 from brojeg/master
Browse files Browse the repository at this point in the history
Enable funlen linter (part 5)
  • Loading branch information
asmyasnikov authored May 29, 2024
2 parents a9fd2f7 + 71ed802 commit 56a634a
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (s StreamReader) CloseSend() error {
return s.Stream.CloseSend()
}

//nolint:funlen
func (s StreamReader) Recv() (ServerMessage, error) {
grpcMess, err := s.Stream.Recv()
if xerrors.Is(err, io.EOF) {
Expand Down
143 changes: 80 additions & 63 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,10 @@ func New[PT Item[T], T any](
opts ...option[PT, T],
) *Pool[PT, T] {
p := &Pool[PT, T]{
trace: defaultTrace,
limit: DefaultLimit,
createItem: func(ctx context.Context) (PT, error) {
var item T

return &item, nil
},
done: make(chan struct{}),
trace: defaultTrace,
limit: DefaultLimit,
createItem: defaultCreateItem[T, PT],
done: make(chan struct{}),
}

for _, opt := range opts {
Expand All @@ -174,62 +170,38 @@ func New[PT Item[T], T any](
})
}()

createItem := p.createItem
p.createItem = createItemWithTimeoutHandling(p.createItem, p)

p.idle = make([]PT, 0, p.limit)
p.index = make(map[PT]struct{}, p.limit)
p.stats = &safeStats{
v: stats.Stats{Limit: p.limit},
onChange: p.trace.OnChange,
}

return p
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT Item[T]](ctx context.Context) (PT, error) {
var item T

return &item, nil
}

p.createItem = func(ctx context.Context) (PT, error) {
// createItemWithTimeoutHandling wraps the createItem function with timeout handling
func createItemWithTimeoutHandling[PT Item[T], T any](
createItem func(ctx context.Context) (PT, error),
p *Pool[PT, T],
) func(ctx context.Context) (PT, error) {
return func(ctx context.Context) (PT, error) {
var (
ch = make(chan PT)
createErr error
)
go func() {
defer close(ch)
createErr = func() error {
var (
createCtx = xcontext.ValueOnly(ctx)
cancelCreate context.CancelFunc
)
if d := p.createTimeout; d > 0 {
createCtx, cancelCreate = xcontext.WithTimeout(createCtx, d)
} else {
createCtx, cancelCreate = xcontext.WithCancel(createCtx)
}
defer cancelCreate()

newItem, err := createItem(createCtx)
if err != nil {
return xerrors.WithStackTrace(err)
}

needCloseItem := true
defer func() {
if needCloseItem {
_ = p.closeItem(ctx, newItem)
}
}()

select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)

case <-ctx.Done():
p.mu.Lock()
defer p.mu.Unlock()

if len(p.index) < p.limit {
p.idle = append(p.idle, newItem)
p.index[newItem] = struct{}{}
p.stats.Index().Inc()
needCloseItem = false
}

return xerrors.WithStackTrace(ctx.Err())

case ch <- newItem:
needCloseItem = false

return nil
}
}()
createErr = createItemWithContext(ctx, p, createItem, ch)
}()

select {
Expand All @@ -249,14 +221,59 @@ func New[PT Item[T], T any](
return item, nil
}
}
p.idle = make([]PT, 0, p.limit)
p.index = make(map[PT]struct{}, p.limit)
p.stats = &safeStats{
v: stats.Stats{Limit: p.limit},
onChange: p.trace.OnChange,
}

// createItemWithContext handles the creation of an item with context handling
func createItemWithContext[PT Item[T], T any](
ctx context.Context,
p *Pool[PT, T],
createItem func(ctx context.Context) (PT, error),
ch chan PT,
) error {
var (
createCtx = xcontext.ValueOnly(ctx)
cancelCreate context.CancelFunc
)

if d := p.createTimeout; d > 0 {
createCtx, cancelCreate = xcontext.WithTimeout(createCtx, d)
} else {
createCtx, cancelCreate = xcontext.WithCancel(createCtx)
}
defer cancelCreate()

return p
newItem, err := createItem(createCtx)
if err != nil {
return xerrors.WithStackTrace(err)
}

needCloseItem := true
defer func() {
if needCloseItem {
_ = p.closeItem(ctx, newItem)
}
}()

select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
p.mu.Lock()
defer p.mu.Unlock()

if len(p.index) < p.limit {
p.idle = append(p.idle, newItem)
p.index[newItem] = struct{}{}
p.stats.Index().Inc()
needCloseItem = false
}

return xerrors.WithStackTrace(ctx.Err())
case ch <- newItem:
needCloseItem = false

return nil
}
}

func (p *Pool[PT, T]) Stats() stats.Stats {
Expand Down
1 change: 1 addition & 0 deletions internal/scripting/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (c *Client) StreamExecute(
return r, xerrors.WithStackTrace(err)
}

//nolint:funlen
func (c *Client) streamExecute(
ctx context.Context,
query string,
Expand Down
1 change: 1 addition & 0 deletions log/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func Coordination(l Logger, d trace.Detailer, opts ...Option) (t trace.Coordinat
return internalCoordination(wrapLogger(l, opts...), d)
}

//nolint:funlen
func internalCoordination(
l *wrapper, //nolint:interfacer
d trace.Detailer,
Expand Down
3 changes: 2 additions & 1 deletion log/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ func Driver(l Logger, d trace.Detailer, opts ...Option) (t trace.Driver) {
return internalDriver(wrapLogger(l, opts...), d)
}

func internalDriver(l Logger, d trace.Detailer) trace.Driver { //nolint:gocyclo
//nolint:gocyclo,funlen
func internalDriver(l Logger, d trace.Detailer) trace.Driver {
return trace.Driver{
OnResolve: func(
info trace.DriverResolveStartInfo,
Expand Down
2 changes: 1 addition & 1 deletion log/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func Query(l Logger, d trace.Detailer, opts ...Option) (t trace.Query) {
return internalQuery(wrapLogger(l, opts...), d)
}

//nolint:gocyclo
//nolint:gocyclo,funlen
func internalQuery(
l *wrapper, //nolint:interfacer
d trace.Detailer,
Expand Down
1 change: 1 addition & 0 deletions log/scripting.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func Scripting(l Logger, d trace.Detailer, opts ...Option) (t trace.Scripting) {
return internalScripting(wrapLogger(l, opts...), d)
}

//nolint:funlen
func internalScripting(l *wrapper, d trace.Detailer) (t trace.Scripting) {
t.OnExecute = func(info trace.ScriptingExecuteStartInfo) func(trace.ScriptingExecuteDoneInfo) {
if d.Details()&trace.ScriptingEvents == 0 {
Expand Down
1 change: 1 addition & 0 deletions log/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func DatabaseSQL(l Logger, d trace.Detailer, opts ...Option) (t trace.DatabaseSQ
return internalDatabaseSQL(wrapLogger(l, opts...), d)
}

//nolint:funlen
func internalDatabaseSQL(l *wrapper, d trace.Detailer) (t trace.DatabaseSQL) {
t.OnConnectorConnect = func(
info trace.DatabaseSQLConnectorConnectStartInfo,
Expand Down
2 changes: 1 addition & 1 deletion log/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Table(l Logger, d trace.Detailer, opts ...Option) (t trace.Table) {
return internalTable(wrapLogger(l, opts...), d)
}

//nolint:gocyclo
//nolint:gocyclo,funlen
func internalTable(l *wrapper, d trace.Detailer) (t trace.Table) {
t.OnDo = func(
info trace.TableDoStartInfo,
Expand Down
3 changes: 2 additions & 1 deletion log/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ func Topic(l Logger, d trace.Detailer, opts ...Option) (t trace.Topic) {
return internalTopic(wrapLogger(l, opts...), d)
}

func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocyclo
//nolint:gocyclo,funlen
func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
t.OnReaderReconnect = func(
info trace.TopicReaderReconnectStartInfo,
) func(doneInfo trace.TopicReaderReconnectDoneInfo) {
Expand Down

0 comments on commit 56a634a

Please sign in to comment.