From bbc8edf8038403369c336ce42719ab88acbcf115 Mon Sep 17 00:00:00 2001 From: aavarghese Date: Wed, 13 Nov 2024 17:40:40 -0500 Subject: [PATCH] feat: ibmcloud be to use local setup rather than k8 kind cluster Signed-off-by: aavarghese --- cmd/subcommands/component.go | 1 + cmd/subcommands/component/run-locally.go | 42 ++++ cmd/subcommands/component/worker/run.go | 2 + cmd/subcommands/queue/cat.go | 2 +- cmd/subcommands/queue/drain.go | 2 +- cmd/subcommands/queue/ls.go | 2 +- cmd/subcommands/queue/upload.go | 2 +- cmd/subcommands/up.go | 9 +- .../code/header-cleanser/requirements.txt | 2 +- .../language/lang-id/requirements.txt | 2 +- .../universal/doc-id/requirements.txt | 2 +- pkg/be/backend.go | 2 +- pkg/be/ibmcloud/create.go | 217 ++++++++++-------- pkg/be/ibmcloud/queue.go | 9 +- pkg/be/kubernetes/queue.go | 2 +- pkg/be/local/queue.go | 2 +- pkg/be/local/shell/job.go | 5 +- pkg/be/local/shell/spawn.go | 8 +- pkg/be/local/spawn.go | 6 +- pkg/be/local/up.go | 8 +- pkg/boot/alldone.go | 4 +- pkg/boot/down.go | 2 +- pkg/boot/failures.go | 4 +- pkg/boot/io.go | 4 +- pkg/boot/up.go | 37 ++- pkg/ir/llir/options.go | 11 +- pkg/observe/qstat/stream.go | 2 +- pkg/runtime/builtins/cat.go | 4 +- pkg/runtime/needs/install_requirements.go | 21 +- pkg/runtime/queue/client.go | 4 +- pkg/runtime/queue/drain.go | 4 +- pkg/runtime/queue/ls.go | 4 +- pkg/runtime/queue/qcat.go | 4 +- pkg/runtime/queue/upload.go | 5 +- pkg/runtime/run-locally.go | 70 ++++++ pkg/runtime/worker/options.go | 3 + pkg/runtime/worker/process-task.go | 6 +- pkg/util/b64.go | 22 +- 38 files changed, 374 insertions(+), 164 deletions(-) create mode 100644 cmd/subcommands/component/run-locally.go create mode 100644 pkg/runtime/run-locally.go diff --git a/cmd/subcommands/component.go b/cmd/subcommands/component.go index aaca1826..934e6314 100644 --- a/cmd/subcommands/component.go +++ b/cmd/subcommands/component.go @@ -17,4 +17,5 @@ func init() { cmd.AddCommand(component.Minio()) cmd.AddCommand(component.Worker()) cmd.AddCommand(component.WorkStealer()) + cmd.AddCommand(component.RunLocally()) } diff --git a/cmd/subcommands/component/run-locally.go b/cmd/subcommands/component/run-locally.go new file mode 100644 index 00000000..1b262d28 --- /dev/null +++ b/cmd/subcommands/component/run-locally.go @@ -0,0 +1,42 @@ +package component + +import ( + "context" + + "github.com/spf13/cobra" + "lunchpail.io/cmd/options" + "lunchpail.io/pkg/build" + "lunchpail.io/pkg/runtime" +) + +type RunLocallyOptions struct { + Component string + LLIR string + build.LogOptions +} + +func AddRunLocallyOptions(cmd *cobra.Command) *RunLocallyOptions { + options := RunLocallyOptions{} + cmd.Flags().StringVarP(&options.Component, "component", "", "", "") + cmd.Flags().StringVar(&options.LLIR, "llir", "", "") + cmd.MarkFlagRequired("component") + cmd.MarkFlagRequired("llir") + return &options +} + +func RunLocally() *cobra.Command { + cmd := &cobra.Command{ + Use: "run-locally", + Short: "Commands for running a component locally", + Long: "Commands for running a component locally", + } + + runOpts := AddRunLocallyOptions(cmd) + options.AddLogOptions(cmd) + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + return runtime.RunLocally(context.Background(), runOpts.Component, runOpts.LLIR, runOpts.LogOptions) + } + + return cmd +} diff --git a/cmd/subcommands/component/worker/run.go b/cmd/subcommands/component/worker/run.go index dc33f9f2..3761e886 100644 --- a/cmd/subcommands/component/worker/run.go +++ b/cmd/subcommands/component/worker/run.go @@ -3,6 +3,7 @@ package worker import ( "context" "fmt" + "time" "github.com/spf13/cobra" @@ -64,6 +65,7 @@ func Run() *cobra.Command { PollingInterval: pollingInterval, LogOptions: *logOpts, RunContext: run.ForStep(step).ForPool(poolName).ForWorker(workerName), + WorkerStartTime: time.Now(), }) } diff --git a/cmd/subcommands/queue/cat.go b/cmd/subcommands/queue/cat.go index 6c633404..bb15ebfe 100644 --- a/cmd/subcommands/queue/cat.go +++ b/cmd/subcommands/queue/cat.go @@ -41,7 +41,7 @@ func Cat() *cobra.Command { return err } - return queue.Qcat(ctx, backend, run, args[0], *opts.Log) + return queue.Qcat(ctx, backend, run, args[0], q.Spec{}, *opts.Log) } return cmd diff --git a/cmd/subcommands/queue/drain.go b/cmd/subcommands/queue/drain.go index 87579d1d..2c432a57 100644 --- a/cmd/subcommands/queue/drain.go +++ b/cmd/subcommands/queue/drain.go @@ -43,7 +43,7 @@ func Drain() *cobra.Command { return err } - return queue.Drain(ctx, backend, run.ForStep(step), *opts.Log) + return queue.Drain(ctx, backend, run.ForStep(step), q.Spec{}, *opts.Log) } return cmd diff --git a/cmd/subcommands/queue/ls.go b/cmd/subcommands/queue/ls.go index 29a3cede..72c99f77 100644 --- a/cmd/subcommands/queue/ls.go +++ b/cmd/subcommands/queue/ls.go @@ -60,7 +60,7 @@ func Ls() *cobra.Command { return err } - files, errors, err := queue.Ls(ctx, backend, runContext.ForStep(step), path, *opts.Log) + files, errors, err := queue.Ls(ctx, backend, runContext.ForStep(step), path, q.Spec{}, *opts.Log) if err != nil { return err } diff --git a/cmd/subcommands/queue/upload.go b/cmd/subcommands/queue/upload.go index 31b2d211..ec934482 100644 --- a/cmd/subcommands/queue/upload.go +++ b/cmd/subcommands/queue/upload.go @@ -47,7 +47,7 @@ func Upload() *cobra.Command { run = rrun.Name } - return queue.UploadFiles(ctx, backend, q.RunContext{RunName: run}, []upload.Upload{upload.Upload{LocalPath: args[0], Bucket: args[1]}}, *opts.Log) + return queue.UploadFiles(ctx, backend, q.RunContext{RunName: run}, []upload.Upload{upload.Upload{LocalPath: args[0], Bucket: args[1]}}, q.Spec{}, *opts.Log) } return cmd diff --git a/cmd/subcommands/up.go b/cmd/subcommands/up.go index 8302f281..85de7a9f 100644 --- a/cmd/subcommands/up.go +++ b/cmd/subcommands/up.go @@ -4,7 +4,9 @@ package subcommands import ( "context" + "fmt" "os" + "time" "github.com/spf13/cobra" @@ -68,7 +70,12 @@ func newUpCmd() *cobra.Command { return err } - _, err = boot.Up(ctx, backend, boot.UpOptions{BuildOptions: *buildOpts, DryRun: dryrunFlag, Watch: watchFlag, WatchUtil: watchFlag, Inputs: args, Executable: os.Args[0], NoRedirect: noRedirect}) + upStartTime := time.Now() + _, err = boot.Up(ctx, backend, boot.UpOptions{BuildOptions: *buildOpts, DryRun: dryrunFlag, Watch: watchFlag, WatchUtil: watchFlag, Inputs: args, Executable: os.Args[0], NoRedirect: noRedirect, UpStartTime: time.Now()}) + upEndTime := time.Now() + if buildOpts.Verbose() { + fmt.Fprintf(os.Stderr, "METRICS: Took %s for running app e2e\n", util.RelTime(upStartTime, upEndTime)) + } return err } diff --git a/demos/data-prep-kit/code/header-cleanser/requirements.txt b/demos/data-prep-kit/code/header-cleanser/requirements.txt index 7f108d51..42ad54ed 100644 --- a/demos/data-prep-kit/code/header-cleanser/requirements.txt +++ b/demos/data-prep-kit/code/header-cleanser/requirements.txt @@ -3,4 +3,4 @@ scancode-toolkit-mini # we can probably update to 18+, but we will have to re-generate expected output as pyarrow 18 seems to have resulted in a binary format change pyarrow<17 -setuptools +setuptools ; platform_system == 'Darwin' diff --git a/demos/data-prep-kit/language/lang-id/requirements.txt b/demos/data-prep-kit/language/lang-id/requirements.txt index e0cba5e6..70ac9d65 100644 --- a/demos/data-prep-kit/language/lang-id/requirements.txt +++ b/demos/data-prep-kit/language/lang-id/requirements.txt @@ -1,6 +1,6 @@ # hmm, needed for tests at least on macos wheel -setuptools +setuptools ; platform_system == 'Darwin' fasttext-wheel==0.9.2 langcodes==3.3.0 diff --git a/demos/data-prep-kit/universal/doc-id/requirements.txt b/demos/data-prep-kit/universal/doc-id/requirements.txt index 1c5de610..75b81a14 100644 --- a/demos/data-prep-kit/universal/doc-id/requirements.txt +++ b/demos/data-prep-kit/universal/doc-id/requirements.txt @@ -1,2 +1,2 @@ pyarrow<17 -setuptools +setuptools ; platform_system == 'Darwin' diff --git a/pkg/be/backend.go b/pkg/be/backend.go index c8ee5351..8672e090 100644 --- a/pkg/be/backend.go +++ b/pkg/be/backend.go @@ -34,7 +34,7 @@ type Backend interface { InstanceCount(ctx context.Context, c lunchpail.Component, run queue.RunContext) (int, error) // Queue properties for a given run, plus ensure access to the endpoint from this client - AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) + AccessQueue(ctx context.Context, run queue.RunContext, queue queue.Spec, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) // Return a streamer Streamer(ctx context.Context, run queue.RunContext) streamer.Streamer diff --git a/pkg/be/ibmcloud/create.go b/pkg/be/ibmcloud/create.go index cb9af3be..4b8154c2 100644 --- a/pkg/be/ibmcloud/create.go +++ b/pkg/be/ibmcloud/create.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "net/http" + "os" "os/exec" "strconv" "strings" @@ -17,11 +18,9 @@ import ( "github.com/elotl/cloud-init/config" "golang.org/x/crypto/ssh" "golang.org/x/sync/errgroup" + q "lunchpail.io/pkg/ir/queue" - "lunchpail.io/pkg/be/kubernetes" - "lunchpail.io/pkg/be/kubernetes/common" "lunchpail.io/pkg/ir/llir" - "lunchpail.io/pkg/lunchpail" "lunchpail.io/pkg/util" ) @@ -51,7 +50,7 @@ func (i *intCounter) inc() { i.lock.Unlock() } -func createInstance(vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, c llir.ShellComponent, resourceGroupID string, vpcID string, keyID string, zone string, profile string, subnetID string, secGroupID string, imageID string, namespace string, copts llir.Options) (*vpcv1.Instance, error) { +func createInstance(vpcService *vpcv1.VpcV1, name string, resourceGroupID string, vpcID string, keyID string, zone string, profile string, subnetID string, secGroupID string, imageID string, namespace string, copts llir.Options, cc *config.CloudConfig) (*vpcv1.Instance, error) { networkInterfacePrototypeModel := &vpcv1.NetworkInterfacePrototype{ Name: &name, Subnet: &vpcv1.SubnetIdentityByID{ @@ -62,29 +61,6 @@ func createInstance(vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, c llir.S }}, } - // TODO pass through actual Cli Options? - opts := common.Options{Options: copts} - - appYamlString, err := kubernetes.MarshalComponentAsStandalone(ir, c, namespace, opts) - if err != nil { - return nil, fmt.Errorf("failed to marshall yaml: %v", err) - } - cc := &config.CloudConfig{ - WriteFiles: []config.File{ - { - Path: "/app.yaml", - Content: appYamlString, - Owner: "root:root", - RawFilePermissions: "0644", - }}, - RunCmd: []string{"sleep 10", //Minimum of 10 seconds needed for cluster to be able to run `apply` - "while ! kind get clusters | grep lunchpail; do sleep 2; done", - "echo 'Kind cluster is ready'", - "env HOME=/root kubectl create ns " + namespace, - "n=0; until [ $n -ge 60 ]; do env HOME=/root kubectl get serviceaccount default -o name -n " + namespace + " && break; n=$((n + 1)); sleep 1; done", - "env HOME=/root kubectl create -f /app.yaml -n " + namespace}, - } - instancePrototypeModel := &vpcv1.InstancePrototypeInstanceByImage{ Name: &name, ResourceGroup: &vpcv1.ResourceGroupIdentity{ @@ -236,7 +212,7 @@ func createSSHKey(vpcService *vpcv1.VpcV1, name string, resourceGroupID string, } key, response, err := vpcService.CreateKey(options) if err != nil { - if response.StatusCode == http.StatusBadRequest && err.Error() == "Key with fingerprint already exists" { + if (response.StatusCode == http.StatusBadRequest || response.StatusCode == http.StatusConflict) && err.Error() == "Key with fingerprint already exists" { //get fingerprint of input public key sshPubKey, _, _, _, _ := ssh.ParseAuthorizedKey([]byte(pubKey)) keyFingerprint := ssh.FingerprintSHA256(sshPubKey) @@ -273,104 +249,153 @@ func createVPC(vpcService *vpcv1.VpcV1, name string, appName string, resourceGro return *vpc.ID, nil } -func createAndInitVM(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, keyType string, publicKey string, zone string, profile string, imageID string, namespace string, opts llir.Options) error { +func createImage(vpcService *vpcv1.VpcV1, name string, resourceGroupID string, vmID string) (string, error) { + options := &vpcv1.CreateImageOptions{ + ImagePrototype: &vpcv1.ImagePrototype{ + Name: &name, + ResourceGroup: &vpcv1.ResourceGroupIdentity{ + ID: &resourceGroupID, + }, + SourceVolume: &vpcv1.VolumeIdentityByID{ + ID: &vmID, + }, + }, + } + image, response, err := vpcService.CreateImage(options) + if err != nil { + return "", fmt.Errorf("failed to create an Image: %v and the response is: %s", err, response) + } + return *image.ID, nil +} + +func createResources(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, keyType string, publicKey string, zone string, profile string, imageID string, namespace string, opts llir.Options) (string, error) { + var instanceID string t1s := time.Now() vpcID, err := createVPC(vpcService, name, ir.AppName, resourceGroupID) if err != nil { - return err + return "", err } t1e := time.Now() t2s := t1e keyID, err := createSSHKey(vpcService, name, resourceGroupID, keyType, publicKey) if err != nil { - return err + return "", err } t2e := time.Now() t3s := t2e subnetID, err := createSubnet(vpcService, name, resourceGroupID, vpcID, zone) if err != nil { - return err + return "", err } t3e := time.Now() t4s := t3e secGroupID, err := createSecurityGroup(vpcService, name, resourceGroupID, vpcID) if err != nil { - return err + return "", err } t4e := time.Now() t5s := t4e if err = createSecurityGroupRule(vpcService, secGroupID); err != nil { - return err + return "", err } t5e := time.Now() - group, _ := errgroup.WithContext(ctx) t6s := time.Now() - // One Component for WorkStealer, one for Dispatcher, and each per WorkerPool - poolCount := intCounter{} + if err = createVMForComponents(ctx, vpcService, name, ir, resourceGroupID, zone, profile, imageID, namespace, vpcID, keyID, subnetID, secGroupID, opts, t6s); err != nil { + return "", err + } + t6e := time.Now() + + if opts.Log.Verbose { + fmt.Fprintf(os.Stderr, "Setup done %s\n", util.RelTime(t1s, t6e)) + fmt.Fprintf(os.Stderr, " - VPC %s\n", util.RelTime(t1s, t1e)) + fmt.Fprintf(os.Stderr, " - SSH %s\n", util.RelTime(t2s, t2e)) + fmt.Fprintf(os.Stderr, " - Subnet %s\n", util.RelTime(t3s, t3e)) + fmt.Fprintf(os.Stderr, " - SecurityGroup %s\n", util.RelTime(t4s, t4e)) + fmt.Fprintf(os.Stderr, " - SecurityGroupRule %s\n", util.RelTime(t5s, t5e)) + fmt.Fprintf(os.Stderr, " - VMs %s\n", util.RelTime(t6s, t6e)) + } + return instanceID, nil +} + +func createVMForComponents(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, zone string, profile string, imageID string, namespace string, vpcID string, keyID string, subnetID string, secGroupID string, opts llir.Options, provTime time.Time) error { + group, _ := errgroup.WithContext(ctx) + var verboseFlag string + for _, c := range ir.Components { instanceName := name + "-" + string(c.C()) - group.Go(func() error { - if c.C() == lunchpail.DispatcherComponent || c.C() == lunchpail.WorkStealerComponent { - instance, err := createInstance(vpcService, instanceName, ir, c, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts) - if err != nil { - return err - } + if opts.Log.Verbose { + fmt.Fprintf(os.Stderr, "Creating VM %s\n", instanceName) + } - //TODO VSI instances other than jumpbox or main pod should not have floatingIP. Remove below after testing - floatingIPID, err := createFloatingIP(vpcService, instanceName, resourceGroupID, zone) - if err != nil { - return err - } + componentB64, err := util.ToJsonGzipB64(c) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return err + } - options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{ - ID: &floatingIPID, - InstanceID: instance.ID, - NetworkInterfaceID: instance.PrimaryNetworkInterface.ID, - } - _, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options) - if err != nil { - return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response) - } - } else if c.C() == lunchpail.WorkersComponent { - poolCount.inc() - workerCount := c.Workers() - poolName := instanceName + strconv.Itoa(poolCount.counter) //multiple worker pools, maybe - - //Compute number of VSIs to be provisioned and job parallelism for each VSI - parallelism, numInstances, err := computeParallelismAndInstanceCount(vpcService, profile, int32(workerCount)) - if err != nil { - return fmt.Errorf("failed to compute number of instances and job parallelism: %v", err) - } + llirB64, err := util.ToJsonGzipB64(ir) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return err + } - for i := 0; i < numInstances; i++ { - workerName := poolName + "-" + strconv.Itoa(i) //multiple worker instances - c = c.SetWorkers(int(parallelism[i])) - instance, err := createInstance(vpcService, workerName, ir, c, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts) - if err != nil { - return err - } - - floatingIPID, err := createFloatingIP(vpcService, workerName, resourceGroupID, zone) - if err != nil { - return err - } - - options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{ - ID: &floatingIPID, - InstanceID: instance.ID, - NetworkInterfaceID: instance.PrimaryNetworkInterface.ID, - } - _, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options) - if err != nil { - return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response) - } - } + if opts.Log.Verbose { + verboseFlag = "--verbose" + } + + cc := &config.CloudConfig{ + RunCmd: []string{"s=$(date +%s)", + //"echo \"METRICS: Took $(($s-" + strconv.FormatInt(provTime.Unix(), 10) + ")) seconds for booting up VM from provision start time\"", + "echo \"METRICS: Took $(($s-" + strconv.FormatInt(opts.UpStartTime.Unix(), 10) + ")) seconds for booting up VM from client Up time\"", + "apt-get install curl jq -y", + "curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o /minio-binaries/mc", + "chmod +x /minio-binaries/mc", + "export PATH=$PATH:/minio-binaries/:/usr/bin/python3", + "mc alias set myminio " + ir.Context.Queue.Endpoint + " " + ir.Context.Queue.AccessKey + " " + ir.Context.Queue.SecretKey, + "exec=$(mc stat myminio/" + ir.Context.Queue.Bucket + "/" + ir.Context.Run.AsFile(q.Blobs) + "/ --json | jq -r '.name')", + "mc get myminio/" + ir.Context.Queue.Bucket + "/" + ir.Context.Run.AsFile(q.Blobs) + "/$exec /lunchpail", + "chmod +x /lunchpail", + "s=$(date +%s)", + "env HOME=/root /lunchpail component run-locally --component " + componentB64 + " --llir " + llirB64 + " " + verboseFlag, + "e=$(date +%s)", + "echo \"METRICS: Took $(($e-$s)) seconds for running LP component\""}, + } + + //TODO: Compute number of VSIs to be provisioned and job parallelism for each VSI based on number of workers and workerpools + group.Go(func() error { + t6s := time.Now() + instance, err := createInstance(vpcService, instanceName, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts, cc) + if err != nil { + return err + } + t6e := time.Now() + if opts.Log.Verbose { + fmt.Fprintf(os.Stderr, " - VSI for %s took %s\n", instanceName, util.RelTime(t6s, t6e)) + } + t7s := time.Now() + floatingIPID, err := createFloatingIP(vpcService, instanceName, resourceGroupID, zone) + if err != nil { + return err + } + + options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{ + ID: &floatingIPID, + InstanceID: instance.ID, + NetworkInterfaceID: instance.PrimaryNetworkInterface.ID, + } + _, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options) + if err != nil { + return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response) + } + t7e := time.Now() + if opts.Log.Verbose { + fmt.Fprintf(os.Stderr, " - FP IP for %s took %s\n", instanceName, util.RelTime(t7s, t7e)) } return nil }) @@ -378,18 +403,8 @@ func createAndInitVM(ctx context.Context, vpcService *vpcv1.VpcV1, name string, if err := group.Wait(); err != nil { return err } - t6e := time.Now() - - fmt.Printf("Setup done %s\n", util.RelTime(t1s, t6e)) - fmt.Printf(" - VPC %s\n", util.RelTime(t1s, t1e)) - fmt.Printf(" - SSH %s\n", util.RelTime(t2s, t2e)) - fmt.Printf(" - Subnet %s\n", util.RelTime(t3s, t3e)) - fmt.Printf(" - SecurityGroup %s\n", util.RelTime(t4s, t4e)) - fmt.Printf(" - SecurityGroupRule %s\n", util.RelTime(t5s, t5e)) - fmt.Printf(" - VMs %s\n", util.RelTime(t6s, t6e)) return nil } - func (backend Backend) SetAction(ctx context.Context, opts llir.Options, ir llir.LLIR, action Action) error { runname := ir.RunName() @@ -406,7 +421,7 @@ func (backend Backend) SetAction(ctx context.Context, opts llir.Options, ir llir } zone = randomZone } - if err := createAndInitVM(ctx, backend.vpcService, runname, ir, backend.config.ResourceGroup.GUID, backend.sshKeyType, backend.sshPublicKey, zone, opts.Profile, opts.ImageID, backend.namespace, opts); err != nil { + if _, err := createResources(ctx, backend.vpcService, runname, ir, backend.config.ResourceGroup.GUID, backend.sshKeyType, backend.sshPublicKey, zone, opts.Profile, opts.ImageID, backend.namespace, opts); err != nil { return err } } diff --git a/pkg/be/ibmcloud/queue.go b/pkg/be/ibmcloud/queue.go index d8b58cd5..c5e29c2e 100644 --- a/pkg/be/ibmcloud/queue.go +++ b/pkg/be/ibmcloud/queue.go @@ -2,14 +2,17 @@ package ibmcloud import ( "context" - "fmt" "lunchpail.io/pkg/build" "lunchpail.io/pkg/ir/queue" ) // Queue properties for a given run, plus ensure access to the endpoint from this client -func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { - err = fmt.Errorf("Unsupported operation: 'AccessQueue'") +func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, queue queue.Spec, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { + endpoint = queue.Endpoint + accessKeyID = queue.AccessKey + secretAccessKey = queue.SecretKey + bucket = queue.Bucket + stop = func() {} return } diff --git a/pkg/be/kubernetes/queue.go b/pkg/be/kubernetes/queue.go index 567b0c91..9dd42d47 100644 --- a/pkg/be/kubernetes/queue.go +++ b/pkg/be/kubernetes/queue.go @@ -16,7 +16,7 @@ import ( ) // Queue properties for a given run, plus ensure access to the endpoint from this client -func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { +func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, queue queue.Spec, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { endpoint, accessKeyID, secretAccessKey, bucket, err = backend.queue(ctx, run) if err != nil { return diff --git a/pkg/be/local/queue.go b/pkg/be/local/queue.go index 6dd2bbe0..65a6af04 100644 --- a/pkg/be/local/queue.go +++ b/pkg/be/local/queue.go @@ -14,7 +14,7 @@ import ( ) // Queue properties for a given run, plus ensure access to the endpoint from this client -func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { +func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, queue queue.Spec, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { endpoint, accessKeyID, secretAccessKey, bucket, err = backend.queue(ctx, run) stop = func() {} return diff --git a/pkg/be/local/shell/job.go b/pkg/be/local/shell/job.go index 4791f70f..7f1136b3 100644 --- a/pkg/be/local/shell/job.go +++ b/pkg/be/local/shell/job.go @@ -11,16 +11,15 @@ import ( ) // Run the component as a "job", with multiple workers -func SpawnJob(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir string, opts build.LogOptions) error { +func SpawnJob(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, opts build.LogOptions) error { if c.InitialWorkers < 1 { return fmt.Errorf("Invalid worker count %d for %v", c.InitialWorkers, c.C()) } - group, jobCtx := errgroup.WithContext(ctx) for workerIdx := range c.InitialWorkers { group.Go(func() error { - return Spawn(jobCtx, c.WithInstanceName(fmt.Sprintf("w%d", workerIdx)), ir, logdir, opts) + return Spawn(jobCtx, c.WithInstanceName(fmt.Sprintf("w%d", workerIdx)), ir, opts) }) } diff --git a/pkg/be/local/shell/spawn.go b/pkg/be/local/shell/spawn.go index 794c2554..311fa9fa 100644 --- a/pkg/be/local/shell/spawn.go +++ b/pkg/be/local/shell/spawn.go @@ -17,7 +17,7 @@ import ( "lunchpail.io/pkg/ir/llir" ) -func Spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir string, opts build.LogOptions) error { +func Spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, opts build.LogOptions) error { pidfile, err := files.Pidfile(ir.Context.Run, c.InstanceName, c.C(), true) if err != nil { return err @@ -29,6 +29,12 @@ func Spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir stri } defer os.RemoveAll(workdir) + // This is where component logs will go + logdir, err := files.LogDir(ir.Context.Run, true) + if err != nil { + return err + } + // tee command output to the logdir instance := strings.Replace(strings.Replace(c.InstanceName, ir.RunName(), "", 1), "--", "-", 1) logfile := files.LogFileForComponent(c.C()) diff --git a/pkg/be/local/spawn.go b/pkg/be/local/spawn.go index 97ba604e..b2efb679 100644 --- a/pkg/be/local/spawn.go +++ b/pkg/be/local/spawn.go @@ -8,10 +8,10 @@ import ( "lunchpail.io/pkg/ir/llir" ) -func (backend Backend) spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir string, opts build.LogOptions) error { +func (backend Backend) spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, opts build.LogOptions) error { if c.RunAsJob { - return shell.SpawnJob(ctx, c, ir, logdir, opts) + return shell.SpawnJob(ctx, c, ir, opts) } else { - return shell.Spawn(ctx, c, ir, logdir, opts) + return shell.Spawn(ctx, c, ir, opts) } } diff --git a/pkg/be/local/up.go b/pkg/be/local/up.go index ece362a2..e6290d10 100644 --- a/pkg/be/local/up.go +++ b/pkg/be/local/up.go @@ -26,12 +26,6 @@ func (backend Backend) Up(octx context.Context, ir llir.LLIR, opts llir.Options, } } - // This is where component logs will go - logdir, err := files.LogDir(ir.Context.Run, true) - if err != nil { - return err - } - // Write a pid file to indicate the pid of this process if pidfile, err := files.PidfileForMain(ir.Context.Run); err != nil { return err @@ -47,7 +41,7 @@ func (backend Backend) Up(octx context.Context, ir llir.LLIR, opts llir.Options, // Launch each of the components group, ctx := errgroup.WithContext(octx) for _, c := range ir.Components { - group.Go(func() error { return backend.spawn(ctx, c, ir, logdir, *opts.Log) }) + group.Go(func() error { return backend.spawn(ctx, c, ir, *opts.Log) }) } // Indicate that we are off to the races diff --git a/pkg/boot/alldone.go b/pkg/boot/alldone.go index 8bf09ce7..033e8511 100644 --- a/pkg/boot/alldone.go +++ b/pkg/boot/alldone.go @@ -12,8 +12,8 @@ import ( s3 "lunchpail.io/pkg/runtime/queue" ) -func waitForAllDone(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, run, opts) +func waitForAllDone(ctx context.Context, backend be.Backend, run queue.RunContext, que queue.Spec, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, run, que, opts) if err != nil { if strings.Contains(err.Error(), "Connection closed") { // already gone diff --git a/pkg/boot/down.go b/pkg/boot/down.go index 1db91d86..fa6185d2 100644 --- a/pkg/boot/down.go +++ b/pkg/boot/down.go @@ -97,7 +97,7 @@ func Down(ctx context.Context, runname string, backend be.Backend, opts DownOpti return err } - if err := backend.Down(ctx, ir, copts); err != nil { + if err := backend.Down(ctx, ir, llir.Options{Options: copts}); err != nil { return err } diff --git a/pkg/boot/failures.go b/pkg/boot/failures.go index bb364826..3a35c85a 100644 --- a/pkg/boot/failures.go +++ b/pkg/boot/failures.go @@ -13,8 +13,8 @@ import ( s3 "lunchpail.io/pkg/runtime/queue" ) -func lookForTaskFailures(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, run, opts) +func lookForTaskFailures(ctx context.Context, backend be.Backend, run queue.RunContext, que queue.Spec, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, run, que, opts) if err != nil { return err } diff --git a/pkg/boot/io.go b/pkg/boot/io.go index 6f854a45..ce293fe5 100644 --- a/pkg/boot/io.go +++ b/pkg/boot/io.go @@ -19,7 +19,7 @@ import ( // Behave like `cat inputs | ... > outputs` func catAndRedirect(ctx context.Context, inputs []string, backend be.Backend, ir llir.LLIR, alldone <-chan struct{}, noRedirect bool, redirectTo string, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, ir.Context.Run, opts) + client, err := s3.NewS3ClientForRun(ctx, backend, ir.Context.Run, ir.Context.Queue, opts) if err != nil { return err } @@ -92,7 +92,7 @@ func catAndRedirect(ctx context.Context, inputs []string, backend be.Backend, ir // For Step > 0, we will need to simulate that a dispatch is done func fakeDispatch(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, run, opts) + client, err := s3.NewS3ClientForRun(ctx, backend, run, queue.Spec{}, opts) if err != nil { return err } diff --git a/pkg/boot/up.go b/pkg/boot/up.go index a0991216..e4fbbb92 100644 --- a/pkg/boot/up.go +++ b/pkg/boot/up.go @@ -6,10 +6,13 @@ import ( "context" "fmt" "os" + "os/exec" "os/signal" "strings" + "time" "lunchpail.io/pkg/be" + "lunchpail.io/pkg/be/target" "lunchpail.io/pkg/build" "lunchpail.io/pkg/fe" "lunchpail.io/pkg/ir/hlir" @@ -29,6 +32,7 @@ type UpOptions struct { Executable string NoRedirect bool RedirectTo string + UpStartTime time.Time } func Up(ctx context.Context, backend be.Backend, opts UpOptions) (llir.Context, error) { @@ -62,7 +66,7 @@ func UpHLIR(ctx context.Context, backend be.Backend, ir hlir.HLIR, opts UpOption func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOptions) error { if opts.DryRun { - out, err := backend.DryRun(ir, opts.BuildOptions) + out, err := backend.DryRun(ir, llir.Options{Options: opts.BuildOptions}) if err != nil { return err } @@ -97,7 +101,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption // Now cancel the context cancel() - if err := backend.Down(ctx, ir, opts.BuildOptions); err != nil { + if err := backend.Down(ctx, ir, llir.Options{Options: opts.BuildOptions}); err != nil { fmt.Fprintln(os.Stderr, "Error bringing down run", err) } @@ -151,7 +155,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption case <-cancellable.Done(): case ctx := <-isRunning6: if ctx.Run.Step == 0 || isFinalStep(ctx) { - errorFromAllDone = waitForAllDone(cancellable, backend, ctx.Run, *opts.BuildOptions.Log) + errorFromAllDone = waitForAllDone(cancellable, backend, ctx.Run, ctx.Queue, *opts.BuildOptions.Log) if errorFromAllDone != nil && strings.Contains(errorFromAllDone.Error(), "connection refused") { // Then Minio went away on its own. That's probably ok. errorFromAllDone = nil @@ -215,7 +219,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption case <-cancellable.Done(): case <-isRunning6: } - if err := lookForTaskFailures(cancellable, backend, ir.Context.Run, *opts.BuildOptions.Log); err != nil { + if err := lookForTaskFailures(cancellable, backend, ir.Context.Run, ir.Context.Queue, *opts.BuildOptions.Log); err != nil { errorFromTask = err // fail fast? cancel() } @@ -229,15 +233,32 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption case <-cancellable.Done(): case <-isRunning6: } - - if err := s3.UploadFiles(cancellable, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, *opts.BuildOptions.Log); err != nil { - fmt.Fprintln(os.Stderr, err) + if opts.BuildOptions.Target.Platform == target.IBMCloud { + //rebuilding self to upload linux-amd64 executable + s3UploadStartTime := time.Now() + cmd := exec.Command("/bin/sh", "-c", opts.Executable+" build -A -o "+opts.Executable) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + fmt.Fprintln(os.Stderr, err) + } + buildEndTime := time.Now() + fmt.Fprintf(os.Stderr, "METRICS: Took %s for building executable for platform\n", util.RelTime(s3UploadStartTime, buildEndTime)) + if err := s3.UploadFiles(cancellable, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable + "-linux-amd64", TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, ir.Context.Queue, *opts.BuildOptions.Log); err != nil { + fmt.Fprintln(os.Stderr, err) + } + s3UploadEndTime := time.Now() + fmt.Fprintf(os.Stderr, "METRICS: Took %s for uploading binary to S3\n", util.RelTime(buildEndTime, s3UploadEndTime)) + } else { + if err := s3.UploadFiles(cancellable, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, ir.Context.Queue, *opts.BuildOptions.Log); err != nil { + fmt.Fprintln(os.Stderr, err) + } } }() } defer cancel() - errorFromUp := backend.Up(cancellable, ir, opts.BuildOptions, isRunning) + errorFromUp := backend.Up(cancellable, ir, llir.Options{Options: opts.BuildOptions, UpStartTime: opts.UpStartTime}, isRunning) /* TODO defer func() { err := backend.Down(cancellable, ir, opts.BuildOptions) diff --git a/pkg/ir/llir/options.go b/pkg/ir/llir/options.go index 1f8552f0..f2e6991c 100644 --- a/pkg/ir/llir/options.go +++ b/pkg/ir/llir/options.go @@ -1,5 +1,12 @@ package llir -import "lunchpail.io/pkg/build" +import ( + "time" -type Options = build.Options + "lunchpail.io/pkg/build" +) + +type Options struct { + build.Options + UpStartTime time.Time +} diff --git a/pkg/observe/qstat/stream.go b/pkg/observe/qstat/stream.go index e4ee5347..de0cdf9b 100644 --- a/pkg/observe/qstat/stream.go +++ b/pkg/observe/qstat/stream.go @@ -24,7 +24,7 @@ func stream(ctx context.Context, runnameIn string, backend be.Backend, opts Opti fmt.Fprintln(os.Stderr, "Tracking run", runname) } - client, err := s3.NewS3ClientForRun(ctx, backend, queue.RunContext{RunName: runname, Step: opts.Step}, opts.LogOptions) + client, err := s3.NewS3ClientForRun(ctx, backend, queue.RunContext{RunName: runname, Step: opts.Step}, queue.Spec{}, opts.LogOptions) if err != nil { return client.RunContext, nil, nil, nil, err } diff --git a/pkg/runtime/builtins/cat.go b/pkg/runtime/builtins/cat.go index 50a8a413..f3607392 100644 --- a/pkg/runtime/builtins/cat.go +++ b/pkg/runtime/builtins/cat.go @@ -26,8 +26,8 @@ func Cat(ctx context.Context, client s3.S3Client, run queue.RunContext, inputs [ return nil } -func CatClient(ctx context.Context, backend be.Backend, run queue.RunContext, inputs []string, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, run, opts) +func CatClient(ctx context.Context, backend be.Backend, run queue.RunContext, que queue.Spec, inputs []string, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, run, que, opts) if err != nil { return err } diff --git a/pkg/runtime/needs/install_requirements.go b/pkg/runtime/needs/install_requirements.go index e14ba974..48a03ae9 100644 --- a/pkg/runtime/needs/install_requirements.go +++ b/pkg/runtime/needs/install_requirements.go @@ -9,6 +9,9 @@ import ( "os/exec" "path/filepath" "syscall" + "time" + + "lunchpail.io/pkg/util" ) func requirementsInstall(ctx context.Context, version, requirements string, verbose bool) (string, error) { @@ -85,10 +88,19 @@ func requirementsInstall(ctx context.Context, version, requirements string, verb version = "3" } - cmdline := fmt.Sprintf(`python%s -m venv %s + apt := "" + if _, err := exec.LookPath("apt"); err == nil { + apt = fmt.Sprintf(`apt install -y python%s-venv python%s-distutils`, version, version) + } + + cmdline := fmt.Sprintf(`%s +python%s -m venv %s source %s/bin/activate if ! which pip%s > /dev/null; then python%s -m pip install pip %s; fi -pip%s install %s %s -r %s %s 1>&2`, version, venvPath, venvPath, version, version, verboseFlag, version, nocache, quiet, reqmtsFile.Name(), verboseFlag) +s=%s +pip%s install %s %s -r %s %s 1>&2 +e=%s +echo \"METRICS: Took $(($e-$s)) seconds for pip installs\"`, apt, version, venvPath, venvPath, version, version, verboseFlag, "$(date +%s)", version, nocache, quiet, reqmtsFile.Name(), verboseFlag, "$(date +%s)") cmd := exec.CommandContext(ctx, "/bin/bash", "-c", cmdline) cmd.Dir = filepath.Dir(venvPath) @@ -109,6 +121,7 @@ pip%s install %s %s -r %s %s 1>&2`, version, venvPath, venvPath, version, versio } }() + t1s := time.Now() if err := cmd.Run(); err != nil { // Clean up the venv cache directory, since we failed at populating it if err := os.RemoveAll(venvPath); err != nil { @@ -118,6 +131,10 @@ pip%s install %s %s -r %s %s 1>&2`, version, venvPath, venvPath, version, versio return path, err } installSuccessful = true + t1e := time.Now() + if verbose { + fmt.Fprintf(os.Stderr, "METRICS: Took %s for pip installs\n", util.RelTime(t1s, t1e)) + } return path, nil } diff --git a/pkg/runtime/queue/client.go b/pkg/runtime/queue/client.go index 2bcf16e4..9729f3a1 100644 --- a/pkg/runtime/queue/client.go +++ b/pkg/runtime/queue/client.go @@ -70,8 +70,8 @@ func NewS3ClientFromOptions(ctx context.Context, opts S3ClientOptions) (S3Client } // Client for a given run in the given backend -func NewS3ClientForRun(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) (S3ClientStop, error) { - endpoint, accessKeyId, secretAccessKey, bucket, stop, err := backend.AccessQueue(ctx, run, opts) +func NewS3ClientForRun(ctx context.Context, backend be.Backend, run queue.RunContext, queue queue.Spec, opts build.LogOptions) (S3ClientStop, error) { + endpoint, accessKeyId, secretAccessKey, bucket, stop, err := backend.AccessQueue(ctx, run, queue, opts) if err != nil { return S3ClientStop{}, err } diff --git a/pkg/runtime/queue/drain.go b/pkg/runtime/queue/drain.go index ccb6d706..7b339561 100644 --- a/pkg/runtime/queue/drain.go +++ b/pkg/runtime/queue/drain.go @@ -13,8 +13,8 @@ import ( ) // Drain the output tasks, allowing graceful termination -func Drain(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { - c, err := NewS3ClientForRun(ctx, backend, run, opts) +func Drain(ctx context.Context, backend be.Backend, run queue.RunContext, que queue.Spec, opts build.LogOptions) error { + c, err := NewS3ClientForRun(ctx, backend, run, que, opts) if err != nil { return err } diff --git a/pkg/runtime/queue/ls.go b/pkg/runtime/queue/ls.go index 3eb27e4b..939e3fe5 100644 --- a/pkg/runtime/queue/ls.go +++ b/pkg/runtime/queue/ls.go @@ -10,8 +10,8 @@ import ( "lunchpail.io/pkg/ir/queue" ) -func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path string, opts build.LogOptions) (<-chan string, <-chan error, error) { - c, err := NewS3ClientForRun(ctx, backend, run, opts) +func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path string, que queue.Spec, opts build.LogOptions) (<-chan string, <-chan error, error) { + c, err := NewS3ClientForRun(ctx, backend, run, que, opts) if err != nil { return nil, nil, err } diff --git a/pkg/runtime/queue/qcat.go b/pkg/runtime/queue/qcat.go index be26a47e..aabb2444 100644 --- a/pkg/runtime/queue/qcat.go +++ b/pkg/runtime/queue/qcat.go @@ -9,8 +9,8 @@ import ( "lunchpail.io/pkg/ir/queue" ) -func Qcat(ctx context.Context, backend be.Backend, run queue.RunContext, path string, opts build.LogOptions) error { - c, err := NewS3ClientForRun(ctx, backend, run, opts) +func Qcat(ctx context.Context, backend be.Backend, run queue.RunContext, path string, que queue.Spec, opts build.LogOptions) error { + c, err := NewS3ClientForRun(ctx, backend, run, que, opts) if err != nil { return err } diff --git a/pkg/runtime/queue/upload.go b/pkg/runtime/queue/upload.go index 5d6d84ff..04fbee6f 100644 --- a/pkg/runtime/queue/upload.go +++ b/pkg/runtime/queue/upload.go @@ -15,14 +15,13 @@ import ( "lunchpail.io/pkg/runtime/queue/upload" ) -func UploadFiles(ctx context.Context, backend be.Backend, run queue.RunContext, specs []upload.Upload, opts build.LogOptions) error { - s3, err := NewS3ClientForRun(ctx, backend, run, opts) +func UploadFiles(ctx context.Context, backend be.Backend, run queue.RunContext, specs []upload.Upload, queue queue.Spec, opts build.LogOptions) error { + s3, err := NewS3ClientForRun(ctx, backend, run, queue, opts) if err != nil { return err } defer s3.Stop() run.Bucket = s3.RunContext.Bucket // TODO - for _, spec := range specs { bucket := spec.Bucket if bucket == "" { diff --git a/pkg/runtime/run-locally.go b/pkg/runtime/run-locally.go new file mode 100644 index 00000000..d69cd8b0 --- /dev/null +++ b/pkg/runtime/run-locally.go @@ -0,0 +1,70 @@ +package runtime + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "os" + + "lunchpail.io/pkg/be/local/shell" + "lunchpail.io/pkg/build" + "lunchpail.io/pkg/ir/llir" +) + +func RunLocally(ctx context.Context, component string, lowerir string, opts build.LogOptions) error { + var c llir.ShellComponent + var ir llir.LLIR + + var err error + + componentByte, err := base64.StdEncoding.DecodeString(component) + if err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + reader, err := gzip.NewReader(bytes.NewReader(componentByte)) + if err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + defer reader.Close() + + if err := json.NewDecoder(reader).Decode(&c); err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + + irByte, err := base64.StdEncoding.DecodeString(lowerir) + if err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + reader, err = gzip.NewReader(bytes.NewReader(irByte)) + if err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + defer reader.Close() + + if err := json.NewDecoder(reader).Decode(&ir); err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + + return shell.Spawn(ctx, c, ir, opts) +} diff --git a/pkg/runtime/worker/options.go b/pkg/runtime/worker/options.go index a921f810..56b78b2f 100644 --- a/pkg/runtime/worker/options.go +++ b/pkg/runtime/worker/options.go @@ -1,6 +1,8 @@ package worker import ( + "time" + "lunchpail.io/pkg/build" "lunchpail.io/pkg/ir/hlir" "lunchpail.io/pkg/ir/queue" @@ -18,4 +20,5 @@ type Options struct { StartupDelay int PollingInterval int build.LogOptions + WorkerStartTime time.Time } diff --git a/pkg/runtime/worker/process-task.go b/pkg/runtime/worker/process-task.go index cea347c3..1798324f 100644 --- a/pkg/runtime/worker/process-task.go +++ b/pkg/runtime/worker/process-task.go @@ -18,6 +18,7 @@ import ( "lunchpail.io/pkg/ir/queue" s3 "lunchpail.io/pkg/runtime/queue" + "lunchpail.io/pkg/util" ) type taskProcessor struct { @@ -148,10 +149,13 @@ func (p taskProcessor) process(task string) error { handlercmd.Stdin = stdin handlercmd.Stderr = io.MultiWriter(os.Stderr, stderrWriter) handlercmd.Stdout = io.MultiWriter(os.Stdout, stdoutWriter) + TaskStartTime := time.Now() + if p.opts.LogOptions.Verbose { + fmt.Fprintf(os.Stderr, "METRICS: Took %s for worker to get to starting task\n", util.RelTime(p.opts.WorkerStartTime, TaskStartTime)) + } if err := handlercmd.Run(); err != nil { fmt.Fprintln(os.Stderr, "Handler launch failed:", err) } - // Clean things up p.handleExitCode(taskContext, handlercmd.ProcessState.ExitCode()) diff --git a/pkg/util/b64.go b/pkg/util/b64.go index 630300cd..02c3c699 100644 --- a/pkg/util/b64.go +++ b/pkg/util/b64.go @@ -2,12 +2,14 @@ package util import ( "bytes" + "compress/gzip" b64 "encoding/base64" "encoding/json" "fmt" - "gopkg.in/yaml.v3" "strconv" "strings" + + "gopkg.in/yaml.v3" ) func ToArray(A []int) string { @@ -51,6 +53,24 @@ func ToJsonB64(something any) (string, error) { return toB64(b), nil } +func ToJsonGzipB64(something any) (string, error) { + b, err := json.Marshal(something) + if err != nil { + return "", err + } + + var buf bytes.Buffer + writer := gzip.NewWriter(&buf) + if _, err := writer.Write(b); err != nil { + return "", err + } + if err = writer.Close(); err != nil { + return "", err + } + + return toB64(buf.Bytes()), nil +} + type EnvEntry struct { Name string `json:"name"` Value string `json:"value"`