Skip to content

Commit

Permalink
feat: max procs
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Jan 13, 2025
1 parent d794c02 commit cccc0a6
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 0 deletions.
104 changes: 104 additions & 0 deletions maxprocs/maxprocs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package maxprocs

import (
"math"
"runtime"
"strconv"
"strings"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
)

func init() {
SetWithConfig(config.New(),
WithLogger(logger.NewLogger().Child("maxprocs")),
)
}

type conf struct {
logger logger.Logger
minProcs int
cpuRequestsMultiplier float64
roundQuotaFunc func(float64) int
}

type Option func(*conf)

func WithLogger(logger logger.Logger) Option {
return func(c *conf) { c.logger = logger }
}

func WithMinProcs(minProcs int) Option {
return func(c *conf) { c.minProcs = minProcs }
}

func WithCPURequestsMultiplier(cpuRequestsMultiplier float64) Option {
return func(c *conf) { c.cpuRequestsMultiplier = cpuRequestsMultiplier }
}

func WithRoundQuotaFunc(roundQuotaFunc func(float64) int) Option {
return func(c *conf) { c.roundQuotaFunc = roundQuotaFunc }
}

func Set(cpuRequests string, opts ...Option) {
conf := &conf{
logger: logger.NOP,
minProcs: 1,
cpuRequestsMultiplier: 3,
roundQuotaFunc: roundQuotaCeil,
}
for _, opt := range opts {
opt(conf)
}

cpuRequest := 1.0
if strings.HasSuffix(cpuRequests, "m") {
value, err := strconv.Atoi(strings.TrimSuffix(cpuRequests, "m"))
if err == nil {
cpuRequest = float64(value) / 1000
} else {
conf.logger.Warnn("unable to parse CPU requests with Atoi, using default value")
}
} else {
value, err := strconv.ParseFloat(cpuRequests, 64)
if err == nil {
cpuRequest = value
} else {
conf.logger.Warnn("unable to parse CPU requests with ParseFloat, using default value")
}
}

// Calculate GOMAXPROCS
gomaxprocs := conf.roundQuotaFunc(cpuRequest * conf.cpuRequestsMultiplier)

if gomaxprocs < conf.minProcs {
gomaxprocs = conf.minProcs
}

// Set GOMAXPROCS
runtime.GOMAXPROCS(gomaxprocs)
}

func SetWithConfig(c *config.Config, opts ...Option) {
conf := &conf{
logger: logger.NOP,
minProcs: c.GetInt("MaxProcs.MinProcs", 1),
cpuRequestsMultiplier: c.GetFloat64("MaxProcs.CPURequestsMultiplier", 3),
roundQuotaFunc: roundQuotaCeil,
}
for _, opt := range opts {
opt(conf)
}

Set(c.GetString("MaxProcs.CPURequests", "1"),
WithLogger(conf.logger),
WithMinProcs(conf.minProcs),
WithCPURequestsMultiplier(conf.cpuRequestsMultiplier),
WithRoundQuotaFunc(conf.roundQuotaFunc),
)
}

func roundQuotaCeil(f float64) int {
return int(math.Ceil(f))
}
165 changes: 165 additions & 0 deletions maxprocs/maxprocs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package maxprocs_test

import (
"math"
"runtime"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger/mock_logger"
"github.com/rudderlabs/rudder-go-kit/maxprocs"
)

func TestSet_Default(t *testing.T) {
before := runtime.GOMAXPROCS(0) // Capture original value
defer runtime.GOMAXPROCS(before) // Restore after test

maxprocs.Set("500m")
require.Equal(t, 2, runtime.GOMAXPROCS(0)) // 500m * 3 = 1.5 → ceil = 2
}

func TestSetWithConfig_Default(t *testing.T) {
cfg := config.New()
cfg.Set("MaxProcs.CPURequests", "500m")

before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

maxprocs.SetWithConfig(cfg)

require.Equal(t, 2, runtime.GOMAXPROCS(0)) // 500m * 3 = 1.5 → ceil = 2
}

func TestSet_WithInvalidCPURequest_Invalid1(t *testing.T) {
before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

ctrl := gomock.NewController(t)
mockLog := mock_logger.NewMockLogger(ctrl)
mockLog.EXPECT().Warnn("unable to parse CPU requests with ParseFloat, using default value").Times(1)

maxprocs.Set("invalid", maxprocs.WithLogger(mockLog))

require.Equal(t, 3, runtime.GOMAXPROCS(0)) // Defaults to 1 * 3 → ceil = 3
}

func TestSetWithConfig_WithInvalidCPURequest_Invalid1(t *testing.T) {
cfg := config.New()
cfg.Set("MaxProcs.CPURequests", "invalid")

before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

ctrl := gomock.NewController(t)
mockLog := mock_logger.NewMockLogger(ctrl)
mockLog.EXPECT().Warnn("unable to parse CPU requests with ParseFloat, using default value").Times(1)

maxprocs.SetWithConfig(cfg, maxprocs.WithLogger(mockLog))

require.Equal(t, 3, runtime.GOMAXPROCS(0)) // Defaults to 1 * 3 → ceil = 3
}

func TestSet_WithInvalidCPURequest_Invalid2(t *testing.T) {
before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

ctrl := gomock.NewController(t)
mockLog := mock_logger.NewMockLogger(ctrl)
mockLog.EXPECT().Warnn("unable to parse CPU requests with Atoi, using default value").Times(1)

maxprocs.Set("invalid_m", maxprocs.WithLogger(mockLog))

require.Equal(t, 3, runtime.GOMAXPROCS(0)) // Defaults to 1 * 3 → ceil = 3
}

func TestSetWithConfig_WithInvalidCPURequest_Invalid2(t *testing.T) {
cfg := config.New()
cfg.Set("MaxProcs.CPURequests", "invalid_m")

before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

ctrl := gomock.NewController(t)
mockLog := mock_logger.NewMockLogger(ctrl)
mockLog.EXPECT().Warnn("unable to parse CPU requests with Atoi, using default value").Times(1)

maxprocs.SetWithConfig(cfg, maxprocs.WithLogger(mockLog))

require.Equal(t, 3, runtime.GOMAXPROCS(0)) // Defaults to 1 * 3 → ceil = 3
}

func TestSet_WithMinProcs(t *testing.T) {
before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

maxprocs.Set("100m", maxprocs.WithMinProcs(5))

require.Equal(t, 5, runtime.GOMAXPROCS(0)) // MinProcs overrides calculated value
}

func TestSetWithConfig_WithMinProcs(t *testing.T) {
cfg := config.New()
cfg.Set("MaxProcs.CPURequests", "100m")
cfg.Set("MaxProcs.MinProcs", 5)

before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

maxprocs.SetWithConfig(cfg)

require.Equal(t, 5, runtime.GOMAXPROCS(0)) // MinProcs overrides calculated value
}

func TestSet_WithMultiplier(t *testing.T) {
before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

maxprocs.Set("300m", maxprocs.WithCPURequestsMultiplier(4))

require.Equal(t, 2, runtime.GOMAXPROCS(0)) // 300m * 4 = 1.2 → ceil = 2
}

func TestSetWithConfig_WithMultiplier(t *testing.T) {
cfg := config.New()
cfg.Set("MaxProcs.CPURequests", "300m")
cfg.Set("MaxProcs.CPURequestsMultiplier", 4)

before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

maxprocs.SetWithConfig(cfg)

require.Equal(t, 2, runtime.GOMAXPROCS(0)) // 300m * 4 = 1.2 → ceil = 2
}

func TestSet_CustomRoundQuotaFunc(t *testing.T) {
before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

roundFloor := func(f float64) int {
return int(math.Floor(f))
}

maxprocs.Set("1500m", maxprocs.WithRoundQuotaFunc(roundFloor))

require.Equal(t, 4, runtime.GOMAXPROCS(0)) // 1500m * 3 = 4.5 → floor = 4
}

func TestSetWithConfig_CustomRoundQuotaFunc(t *testing.T) {
cfg := config.New()
cfg.Set("MaxProcs.CPURequests", "1500m")

before := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(before)

roundFloor := func(f float64) int {
return int(math.Floor(f))
}

maxprocs.SetWithConfig(cfg, maxprocs.WithRoundQuotaFunc(roundFloor))

require.Equal(t, 4, runtime.GOMAXPROCS(0)) // 1500m * 3 = 4.5 → floor = 4
}

0 comments on commit cccc0a6

Please sign in to comment.