Skip to content

Commit

Permalink
Add support for native distributed execution and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Jul 13, 2023
1 parent ab456b2 commit a9ece8a
Show file tree
Hide file tree
Showing 12 changed files with 2,018 additions and 7 deletions.
193 changes: 193 additions & 0 deletions cmd/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package cmd

import (
"bytes"
"context"
"encoding/json"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/distributed"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"google.golang.org/grpc"
"gopkg.in/guregu/null.v3"
)

// TODO: something cleaner
func getMetricsHook(
ctx context.Context, instanceID uint32,
client distributed.DistributedTestClient, logger logrus.FieldLogger,
) func(*engine.MetricsEngine) func() {
logger = logger.WithField("component", "metric-engine-hook")
return func(me *engine.MetricsEngine) func() {
stop := make(chan struct{})
done := make(chan struct{})

dumpMetrics := func() {
logger.Debug("Starting metric dump...")
me.MetricsLock.Lock()
defer me.MetricsLock.Unlock()

metrics := make([]*distributed.MetricDump, 0, len(me.ObservedMetrics))
for _, om := range me.ObservedMetrics {
data, err := om.Sink.Drain()
if err != nil {
logger.Errorf("There was a problem draining the sink for metric %s: %s", om.Name, err)
}
metrics = append(metrics, &distributed.MetricDump{
Name: om.Name,
Data: data,
})
}

data := &distributed.MetricsDump{
InstanceID: instanceID,
Metrics: metrics,
}
_, err := client.SendMetrics(ctx, data)
if err != nil {
logger.Errorf("There was a problem dumping metrics: %s", err)
}
}

go func() {
defer close(done)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
dumpMetrics()
case <-stop:
dumpMetrics()
return
}
}
}()

finalize := func() {
logger.Debug("Final metric dump...")
close(stop)
<-done
logger.Debug("Done!")
}

return finalize
}
}

// TODO: a whole lot of cleanup, refactoring, error handling and hardening
func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
c := &cmdsRunAndAgent{gs: gs}

c.loadConfiguredTest = func(cmd *cobra.Command, args []string) (
*loadedAndConfiguredTest, execution.Controller, error,
) {
conn, err := grpc.Dial(args[0], grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
c.testEndHook = func(err error) {
gs.Logger.Debugf("k6 agent run ended with err=%s", err)
conn.Close()
}

client := distributed.NewDistributedTestClient(conn)

resp, err := client.Register(gs.Ctx, &distributed.RegisterRequest{})
if err != nil {
return nil, nil, err
}

c.metricsEngineHook = getMetricsHook(gs.Ctx, resp.InstanceID, client, gs.Logger)

controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger)
if err != nil {
return nil, nil, err
}

var options lib.Options
if err := json.Unmarshal(resp.Options, &options); err != nil {
return nil, nil, err
}

arc, err := lib.ReadArchive(bytes.NewReader(resp.Archive))
if err != nil {
return nil, nil, err
}

registry := metrics.NewRegistry()
piState := &lib.TestPreInitState{
Logger: gs.Logger,
RuntimeOptions: lib.RuntimeOptions{
NoThresholds: null.BoolFrom(true),
NoSummary: null.BoolFrom(true),
Env: arc.Env,
CompatibilityMode: null.StringFrom(arc.CompatibilityMode),
},
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
}

initRunner, err := js.NewFromArchive(piState, arc)
if err != nil {
return nil, nil, err
}

test := &loadedTest{
pwd: arc.Pwd,
sourceRootPath: arc.Filename,
source: &loader.SourceData{
Data: resp.Archive,
URL: arc.FilenameURL,
},
fs: afero.NewMemMapFs(), // TODO: figure out what should be here
fileSystems: arc.Filesystems,
preInitState: piState,
initRunner: initRunner,
}

pseudoConsoldatedConfig := applyDefault(Config{Options: options})
for _, thresholds := range pseudoConsoldatedConfig.Thresholds {
if err = thresholds.Parse(); err != nil {
return nil, nil, err
}
}
derivedConfig, err := deriveAndValidateConfig(pseudoConsoldatedConfig, initRunner.IsExecutable, gs.Logger)
if err != nil {
return nil, nil, err
}

configuredTest := &loadedAndConfiguredTest{
loadedTest: test,
consolidatedConfig: pseudoConsoldatedConfig,
derivedConfig: derivedConfig,
}

gs.Flags.Address = "" // TODO: fix, this is a hack so agents don't start an API server

return configuredTest, controller, nil // TODO
}

agentCmd := &cobra.Command{
Use: "agent",
Short: "Join a distributed load test",
Long: `TODO`,
Args: exactArgsWithMsg(1, "arg should either the IP and port of the controller k6 instance"),
RunE: c.run,
Hidden: true, // TODO: remove when officially released
}

// TODO: add flags

return agentCmd
}
146 changes: 146 additions & 0 deletions cmd/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package cmd

import (
"fmt"
"net"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/distributed"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics/engine"
"google.golang.org/grpc"
)

// cmdCoordinator handles the `k6 coordinator` sub-command
type cmdCoordinator struct {
gs *state.GlobalState
gRPCAddress string
instanceCount int
}

//nolint:funlen // TODO: split apart
func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) {
ctx, runAbort := execution.NewTestRunContext(c.gs.Ctx, c.gs.Logger)

test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}

// Only consolidated options, not derived
testRunState, err := test.buildTestRunState(test.consolidatedConfig.Options)
if err != nil {
return err
}

metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, c.gs.Logger)
if err != nil {
return err
}

coordinator, err := distributed.NewCoordinatorServer(
c.instanceCount, test.initRunner.MakeArchive(), metricsEngine, c.gs.Logger,
)
if err != nil {
return err
}

if !testRunState.RuntimeOptions.NoSummary.Bool {
defer func() {
c.gs.Logger.Debug("Generating the end-of-test summary...")
summaryResult, serr := test.initRunner.HandleSummary(ctx, &lib.Summary{
Metrics: metricsEngine.ObservedMetrics,
RootGroup: test.initRunner.GetDefaultGroup(),
TestRunDuration: coordinator.GetCurrentTestRunDuration(),
NoColor: c.gs.Flags.NoColor,
UIState: lib.UIState{
IsStdOutTTY: c.gs.Stdout.IsTTY,
IsStdErrTTY: c.gs.Stderr.IsTTY,
},
})
if serr == nil {
serr = handleSummaryResult(c.gs.FS, c.gs.Stdout, c.gs.Stderr, summaryResult)
}
if serr != nil {
c.gs.Logger.WithError(serr).Error("Failed to handle the end-of-test summary")
}
}()
}

if !testRunState.RuntimeOptions.NoThresholds.Bool {
getCurrentTestDuration := coordinator.GetCurrentTestRunDuration
finalizeThresholds := metricsEngine.StartThresholdCalculations(nil, runAbort, getCurrentTestDuration)

defer func() {
// This gets called after all of the outputs have stopped, so we are
// sure there won't be any more metrics being sent.
c.gs.Logger.Debug("Finalizing thresholds...")
breachedThresholds := finalizeThresholds()
if len(breachedThresholds) > 0 {
tErr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(
fmt.Errorf("thresholds on metrics '%s' have been breached", strings.Join(breachedThresholds, ", ")),
exitcodes.ThresholdsHaveFailed,
), errext.AbortedByThresholdsAfterTestEnd)

if err == nil {
err = tErr
} else {
c.gs.Logger.WithError(tErr).Debug("Breached thresholds, but test already exited with another error")
}
}
}()
}

c.gs.Logger.Infof("Starting gRPC server on %s", c.gRPCAddress)
listener, err := net.Listen("tcp", c.gRPCAddress)
if err != nil {
return err
}

grpcServer := grpc.NewServer() // TODO: add auth and a whole bunch of other options
distributed.RegisterDistributedTestServer(grpcServer, coordinator)

go func() {
err := grpcServer.Serve(listener)
c.gs.Logger.Debugf("gRPC server end: %s", err)
}()
coordinator.Wait()
c.gs.Logger.Infof("All done!")
return nil
}

func (c *cmdCoordinator) flagSet() *pflag.FlagSet {
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.SortFlags = false
flags.AddFlagSet(optionFlagSet())
flags.AddFlagSet(runtimeOptionFlagSet(false))
flags.StringVar(&c.gRPCAddress, "grpc-addr", "localhost:6566", "address on which to bind the gRPC server")
flags.IntVar(&c.instanceCount, "instance-count", 1, "number of distributed instances")
return flags
}

func getCmdCoordnator(gs *state.GlobalState) *cobra.Command {
c := &cmdCoordinator{
gs: gs,
}

coordinatorCmd := &cobra.Command{
Use: "coordinator",
Short: "Start a distributed load test",
Long: `TODO`,
RunE: c.run,
Hidden: true, // TODO: remove when officially released
}

coordinatorCmd.Flags().SortFlags = false
coordinatorCmd.Flags().AddFlagSet(c.flagSet())

return coordinatorCmd
}
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func newRootCommand(gs *state.GlobalState) *rootCommand {
getCmdArchive, getCmdCloud, getCmdConvert, getCmdInspect,
getCmdLogin, getCmdPause, getCmdResume, getCmdScale, getCmdRun,
getCmdStats, getCmdStatus, getCmdVersion,
getCmdAgent, getCmdCoordnator,
}

for _, sc := range subCommands {
Expand Down
Loading

0 comments on commit a9ece8a

Please sign in to comment.