From cccc0a6c7d1b2747fd97d02504d5fd39b9042755 Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Mon, 13 Jan 2025 15:14:05 +0100 Subject: [PATCH] feat: max procs --- maxprocs/maxprocs.go | 104 ++++++++++++++++++++++++ maxprocs/maxprocs_test.go | 165 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 maxprocs/maxprocs.go create mode 100644 maxprocs/maxprocs_test.go diff --git a/maxprocs/maxprocs.go b/maxprocs/maxprocs.go new file mode 100644 index 00000000..a7202f58 --- /dev/null +++ b/maxprocs/maxprocs.go @@ -0,0 +1,104 @@ +package maxprocs + +import ( + "math" + "runtime" + "strconv" + "strings" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" +) + +func init() { + SetWithConfig(config.New(), + WithLogger(logger.NewLogger().Child("maxprocs")), + ) +} + +type conf struct { + logger logger.Logger + minProcs int + cpuRequestsMultiplier float64 + roundQuotaFunc func(float64) int +} + +type Option func(*conf) + +func WithLogger(logger logger.Logger) Option { + return func(c *conf) { c.logger = logger } +} + +func WithMinProcs(minProcs int) Option { + return func(c *conf) { c.minProcs = minProcs } +} + +func WithCPURequestsMultiplier(cpuRequestsMultiplier float64) Option { + return func(c *conf) { c.cpuRequestsMultiplier = cpuRequestsMultiplier } +} + +func WithRoundQuotaFunc(roundQuotaFunc func(float64) int) Option { + return func(c *conf) { c.roundQuotaFunc = roundQuotaFunc } +} + +func Set(cpuRequests string, opts ...Option) { + conf := &conf{ + logger: logger.NOP, + minProcs: 1, + cpuRequestsMultiplier: 3, + roundQuotaFunc: roundQuotaCeil, + } + for _, opt := range opts { + opt(conf) + } + + cpuRequest := 1.0 + if strings.HasSuffix(cpuRequests, "m") { + value, err := strconv.Atoi(strings.TrimSuffix(cpuRequests, "m")) + if err == nil { + cpuRequest = float64(value) / 1000 + } else { + conf.logger.Warnn("unable to parse CPU requests with Atoi, using default value") + } + } else { + value, err := strconv.ParseFloat(cpuRequests, 64) + if err == nil { + cpuRequest = value + } else { + conf.logger.Warnn("unable to parse CPU requests with ParseFloat, using default value") + } + } + + // Calculate GOMAXPROCS + gomaxprocs := conf.roundQuotaFunc(cpuRequest * conf.cpuRequestsMultiplier) + + if gomaxprocs < conf.minProcs { + gomaxprocs = conf.minProcs + } + + // Set GOMAXPROCS + runtime.GOMAXPROCS(gomaxprocs) +} + +func SetWithConfig(c *config.Config, opts ...Option) { + conf := &conf{ + logger: logger.NOP, + minProcs: c.GetInt("MaxProcs.MinProcs", 1), + cpuRequestsMultiplier: c.GetFloat64("MaxProcs.CPURequestsMultiplier", 3), + roundQuotaFunc: roundQuotaCeil, + } + for _, opt := range opts { + opt(conf) + } + + Set(c.GetString("MaxProcs.CPURequests", "1"), + WithLogger(conf.logger), + WithMinProcs(conf.minProcs), + WithCPURequestsMultiplier(conf.cpuRequestsMultiplier), + WithRoundQuotaFunc(conf.roundQuotaFunc), + ) +} + +func roundQuotaCeil(f float64) int { + return int(math.Ceil(f)) +} diff --git a/maxprocs/maxprocs_test.go b/maxprocs/maxprocs_test.go new file mode 100644 index 00000000..b969c38c --- /dev/null +++ b/maxprocs/maxprocs_test.go @@ -0,0 +1,165 @@ +package maxprocs_test + +import ( + "math" + "runtime" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger/mock_logger" + "github.com/rudderlabs/rudder-go-kit/maxprocs" +) + +func TestSet_Default(t *testing.T) { + before := runtime.GOMAXPROCS(0) // Capture original value + defer runtime.GOMAXPROCS(before) // Restore after test + + maxprocs.Set("500m") + require.Equal(t, 2, runtime.GOMAXPROCS(0)) // 500m * 3 = 1.5 → ceil = 2 +} + +func TestSetWithConfig_Default(t *testing.T) { + cfg := config.New() + cfg.Set("MaxProcs.CPURequests", "500m") + + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + maxprocs.SetWithConfig(cfg) + + require.Equal(t, 2, runtime.GOMAXPROCS(0)) // 500m * 3 = 1.5 → ceil = 2 +} + +func TestSet_WithInvalidCPURequest_Invalid1(t *testing.T) { + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + ctrl := gomock.NewController(t) + mockLog := mock_logger.NewMockLogger(ctrl) + mockLog.EXPECT().Warnn("unable to parse CPU requests with ParseFloat, using default value").Times(1) + + maxprocs.Set("invalid", maxprocs.WithLogger(mockLog)) + + require.Equal(t, 3, runtime.GOMAXPROCS(0)) // Defaults to 1 * 3 → ceil = 3 +} + +func TestSetWithConfig_WithInvalidCPURequest_Invalid1(t *testing.T) { + cfg := config.New() + cfg.Set("MaxProcs.CPURequests", "invalid") + + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + ctrl := gomock.NewController(t) + mockLog := mock_logger.NewMockLogger(ctrl) + mockLog.EXPECT().Warnn("unable to parse CPU requests with ParseFloat, using default value").Times(1) + + maxprocs.SetWithConfig(cfg, maxprocs.WithLogger(mockLog)) + + require.Equal(t, 3, runtime.GOMAXPROCS(0)) // Defaults to 1 * 3 → ceil = 3 +} + +func TestSet_WithInvalidCPURequest_Invalid2(t *testing.T) { + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + ctrl := gomock.NewController(t) + mockLog := mock_logger.NewMockLogger(ctrl) + mockLog.EXPECT().Warnn("unable to parse CPU requests with Atoi, using default value").Times(1) + + maxprocs.Set("invalid_m", maxprocs.WithLogger(mockLog)) + + require.Equal(t, 3, runtime.GOMAXPROCS(0)) // Defaults to 1 * 3 → ceil = 3 +} + +func TestSetWithConfig_WithInvalidCPURequest_Invalid2(t *testing.T) { + cfg := config.New() + cfg.Set("MaxProcs.CPURequests", "invalid_m") + + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + ctrl := gomock.NewController(t) + mockLog := mock_logger.NewMockLogger(ctrl) + mockLog.EXPECT().Warnn("unable to parse CPU requests with Atoi, using default value").Times(1) + + maxprocs.SetWithConfig(cfg, maxprocs.WithLogger(mockLog)) + + require.Equal(t, 3, runtime.GOMAXPROCS(0)) // Defaults to 1 * 3 → ceil = 3 +} + +func TestSet_WithMinProcs(t *testing.T) { + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + maxprocs.Set("100m", maxprocs.WithMinProcs(5)) + + require.Equal(t, 5, runtime.GOMAXPROCS(0)) // MinProcs overrides calculated value +} + +func TestSetWithConfig_WithMinProcs(t *testing.T) { + cfg := config.New() + cfg.Set("MaxProcs.CPURequests", "100m") + cfg.Set("MaxProcs.MinProcs", 5) + + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + maxprocs.SetWithConfig(cfg) + + require.Equal(t, 5, runtime.GOMAXPROCS(0)) // MinProcs overrides calculated value +} + +func TestSet_WithMultiplier(t *testing.T) { + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + maxprocs.Set("300m", maxprocs.WithCPURequestsMultiplier(4)) + + require.Equal(t, 2, runtime.GOMAXPROCS(0)) // 300m * 4 = 1.2 → ceil = 2 +} + +func TestSetWithConfig_WithMultiplier(t *testing.T) { + cfg := config.New() + cfg.Set("MaxProcs.CPURequests", "300m") + cfg.Set("MaxProcs.CPURequestsMultiplier", 4) + + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + maxprocs.SetWithConfig(cfg) + + require.Equal(t, 2, runtime.GOMAXPROCS(0)) // 300m * 4 = 1.2 → ceil = 2 +} + +func TestSet_CustomRoundQuotaFunc(t *testing.T) { + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + roundFloor := func(f float64) int { + return int(math.Floor(f)) + } + + maxprocs.Set("1500m", maxprocs.WithRoundQuotaFunc(roundFloor)) + + require.Equal(t, 4, runtime.GOMAXPROCS(0)) // 1500m * 3 = 4.5 → floor = 4 +} + +func TestSetWithConfig_CustomRoundQuotaFunc(t *testing.T) { + cfg := config.New() + cfg.Set("MaxProcs.CPURequests", "1500m") + + before := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(before) + + roundFloor := func(f float64) int { + return int(math.Floor(f)) + } + + maxprocs.SetWithConfig(cfg, maxprocs.WithRoundQuotaFunc(roundFloor)) + + require.Equal(t, 4, runtime.GOMAXPROCS(0)) // 1500m * 3 = 4.5 → floor = 4 +}