From de2e36315fb4d80e7000f6fe4601670d986add2c Mon Sep 17 00:00:00 2001 From: aavarghese Date: Wed, 13 Nov 2024 17:40:40 -0500 Subject: [PATCH] feat: ibmcloud be to use local setup rather than k8 kind cluster Signed-off-by: aavarghese --- cmd/subcommands/component.go | 1 + cmd/subcommands/component/run-locally.go | 42 ++++ cmd/subcommands/queue/drain.go | 2 +- cmd/subcommands/queue/ls.go | 2 +- cmd/subcommands/queue/upload.go | 2 +- pkg/be/backend.go | 5 +- pkg/be/ibmcloud/create.go | 236 +++++++++++++---------- pkg/be/ibmcloud/image.go | 36 ++++ pkg/be/ibmcloud/queue.go | 25 ++- pkg/be/kubernetes/image.go | 12 ++ pkg/be/kubernetes/queue.go | 2 +- pkg/be/local/image.go | 12 ++ pkg/be/local/queue.go | 2 +- pkg/be/local/shell/job.go | 5 +- pkg/be/local/shell/spawn.go | 8 +- pkg/be/local/spawn.go | 6 +- pkg/be/local/up.go | 8 +- pkg/boot/alldone.go | 4 +- pkg/boot/failures.go | 4 +- pkg/boot/io.go | 8 +- pkg/boot/up.go | 35 +++- pkg/client/queue/rclone.go | 109 +++++++++++ pkg/observe/qstat/stream.go | 2 +- pkg/observe/qstat/ui.go | 3 + pkg/runtime/builtins/cat.go | 4 +- pkg/runtime/queue/client.go | 4 +- pkg/runtime/queue/drain.go | 4 +- pkg/runtime/queue/ls.go | 4 +- pkg/runtime/queue/qcat.go | 2 +- pkg/runtime/queue/upload.go | 11 +- pkg/runtime/run-locally.go | 49 +++++ 31 files changed, 498 insertions(+), 151 deletions(-) create mode 100644 cmd/subcommands/component/run-locally.go create mode 100644 pkg/be/ibmcloud/image.go create mode 100644 pkg/be/kubernetes/image.go create mode 100644 pkg/be/local/image.go create mode 100644 pkg/client/queue/rclone.go create mode 100644 pkg/runtime/run-locally.go diff --git a/cmd/subcommands/component.go b/cmd/subcommands/component.go index aaca1826..934e6314 100644 --- a/cmd/subcommands/component.go +++ b/cmd/subcommands/component.go @@ -17,4 +17,5 @@ func init() { cmd.AddCommand(component.Minio()) cmd.AddCommand(component.Worker()) cmd.AddCommand(component.WorkStealer()) + cmd.AddCommand(component.RunLocally()) } diff --git a/cmd/subcommands/component/run-locally.go b/cmd/subcommands/component/run-locally.go new file mode 100644 index 00000000..1b262d28 --- /dev/null +++ b/cmd/subcommands/component/run-locally.go @@ -0,0 +1,42 @@ +package component + +import ( + "context" + + "github.com/spf13/cobra" + "lunchpail.io/cmd/options" + "lunchpail.io/pkg/build" + "lunchpail.io/pkg/runtime" +) + +type RunLocallyOptions struct { + Component string + LLIR string + build.LogOptions +} + +func AddRunLocallyOptions(cmd *cobra.Command) *RunLocallyOptions { + options := RunLocallyOptions{} + cmd.Flags().StringVarP(&options.Component, "component", "", "", "") + cmd.Flags().StringVar(&options.LLIR, "llir", "", "") + cmd.MarkFlagRequired("component") + cmd.MarkFlagRequired("llir") + return &options +} + +func RunLocally() *cobra.Command { + cmd := &cobra.Command{ + Use: "run-locally", + Short: "Commands for running a component locally", + Long: "Commands for running a component locally", + } + + runOpts := AddRunLocallyOptions(cmd) + options.AddLogOptions(cmd) + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + return runtime.RunLocally(context.Background(), runOpts.Component, runOpts.LLIR, runOpts.LogOptions) + } + + return cmd +} diff --git a/cmd/subcommands/queue/drain.go b/cmd/subcommands/queue/drain.go index 87579d1d..293812ca 100644 --- a/cmd/subcommands/queue/drain.go +++ b/cmd/subcommands/queue/drain.go @@ -43,7 +43,7 @@ func Drain() *cobra.Command { return err } - return queue.Drain(ctx, backend, run.ForStep(step), *opts.Log) + return queue.Drain(ctx, backend, run.ForStep(step), opts.Queue, *opts.Log) } return cmd diff --git a/cmd/subcommands/queue/ls.go b/cmd/subcommands/queue/ls.go index 29a3cede..4914646f 100644 --- a/cmd/subcommands/queue/ls.go +++ b/cmd/subcommands/queue/ls.go @@ -60,7 +60,7 @@ func Ls() *cobra.Command { return err } - files, errors, err := queue.Ls(ctx, backend, runContext.ForStep(step), path, *opts.Log) + files, errors, err := queue.Ls(ctx, backend, runContext.ForStep(step), path, opts.Queue, *opts.Log) if err != nil { return err } diff --git a/cmd/subcommands/queue/upload.go b/cmd/subcommands/queue/upload.go index 31b2d211..4eb5ad95 100644 --- a/cmd/subcommands/queue/upload.go +++ b/cmd/subcommands/queue/upload.go @@ -47,7 +47,7 @@ func Upload() *cobra.Command { run = rrun.Name } - return queue.UploadFiles(ctx, backend, q.RunContext{RunName: run}, []upload.Upload{upload.Upload{LocalPath: args[0], Bucket: args[1]}}, *opts.Log) + return queue.UploadFiles(ctx, backend, q.RunContext{RunName: run}, []upload.Upload{upload.Upload{LocalPath: args[0], Bucket: args[1]}}, opts.Queue, *opts.Log) } return cmd diff --git a/pkg/be/backend.go b/pkg/be/backend.go index c8ee5351..040cfacb 100644 --- a/pkg/be/backend.go +++ b/pkg/be/backend.go @@ -34,8 +34,11 @@ type Backend interface { InstanceCount(ctx context.Context, c lunchpail.Component, run queue.RunContext) (int, error) // Queue properties for a given run, plus ensure access to the endpoint from this client - AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) + AccessQueue(ctx context.Context, run queue.RunContext, rclone string, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) // Return a streamer Streamer(ctx context.Context, run queue.RunContext) streamer.Streamer + + // Build any images(s) needed for Up + CreateImage(ctx context.Context, linked llir.LLIR, opts llir.Options, destroy bool) (string, error) } diff --git a/pkg/be/ibmcloud/create.go b/pkg/be/ibmcloud/create.go index cb9af3be..01126822 100644 --- a/pkg/be/ibmcloud/create.go +++ b/pkg/be/ibmcloud/create.go @@ -6,8 +6,8 @@ import ( "fmt" "math" "net/http" + "os" "os/exec" - "strconv" "strings" "sync" "time" @@ -17,11 +17,9 @@ import ( "github.com/elotl/cloud-init/config" "golang.org/x/crypto/ssh" "golang.org/x/sync/errgroup" + q "lunchpail.io/pkg/ir/queue" - "lunchpail.io/pkg/be/kubernetes" - "lunchpail.io/pkg/be/kubernetes/common" "lunchpail.io/pkg/ir/llir" - "lunchpail.io/pkg/lunchpail" "lunchpail.io/pkg/util" ) @@ -51,7 +49,7 @@ func (i *intCounter) inc() { i.lock.Unlock() } -func createInstance(vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, c llir.ShellComponent, resourceGroupID string, vpcID string, keyID string, zone string, profile string, subnetID string, secGroupID string, imageID string, namespace string, copts llir.Options) (*vpcv1.Instance, error) { +func createInstance(vpcService *vpcv1.VpcV1, name string, resourceGroupID string, vpcID string, keyID string, zone string, profile string, subnetID string, secGroupID string, imageID string, namespace string, copts llir.Options, cc *config.CloudConfig) (*vpcv1.Instance, error) { networkInterfacePrototypeModel := &vpcv1.NetworkInterfacePrototype{ Name: &name, Subnet: &vpcv1.SubnetIdentityByID{ @@ -62,29 +60,6 @@ func createInstance(vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, c llir.S }}, } - // TODO pass through actual Cli Options? - opts := common.Options{Options: copts} - - appYamlString, err := kubernetes.MarshalComponentAsStandalone(ir, c, namespace, opts) - if err != nil { - return nil, fmt.Errorf("failed to marshall yaml: %v", err) - } - cc := &config.CloudConfig{ - WriteFiles: []config.File{ - { - Path: "/app.yaml", - Content: appYamlString, - Owner: "root:root", - RawFilePermissions: "0644", - }}, - RunCmd: []string{"sleep 10", //Minimum of 10 seconds needed for cluster to be able to run `apply` - "while ! kind get clusters | grep lunchpail; do sleep 2; done", - "echo 'Kind cluster is ready'", - "env HOME=/root kubectl create ns " + namespace, - "n=0; until [ $n -ge 60 ]; do env HOME=/root kubectl get serviceaccount default -o name -n " + namespace + " && break; n=$((n + 1)); sleep 1; done", - "env HOME=/root kubectl create -f /app.yaml -n " + namespace}, - } - instancePrototypeModel := &vpcv1.InstancePrototypeInstanceByImage{ Name: &name, ResourceGroup: &vpcv1.ResourceGroupIdentity{ @@ -273,104 +248,144 @@ func createVPC(vpcService *vpcv1.VpcV1, name string, appName string, resourceGro return *vpc.ID, nil } -func createAndInitVM(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, keyType string, publicKey string, zone string, profile string, imageID string, namespace string, opts llir.Options) error { +func createImage(vpcService *vpcv1.VpcV1, name string, resourceGroupID string, vmID string) (string, error) { + options := &vpcv1.CreateImageOptions{ + ImagePrototype: &vpcv1.ImagePrototype{ + Name: &name, + ResourceGroup: &vpcv1.ResourceGroupIdentity{ + ID: &resourceGroupID, + }, + SourceVolume: &vpcv1.VolumeIdentityByID{ + ID: &vmID, + }, + }, + } + image, response, err := vpcService.CreateImage(options) + if err != nil { + return "", fmt.Errorf("failed to create an Image: %v and the response is: %s", err, response) + } + return *image.ID, nil +} + +func createResources(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, keyType string, publicKey string, zone string, profile string, imageID string, namespace string, opts llir.Options) (string, error) { + var instanceID string t1s := time.Now() vpcID, err := createVPC(vpcService, name, ir.AppName, resourceGroupID) if err != nil { - return err + return "", err } t1e := time.Now() t2s := t1e keyID, err := createSSHKey(vpcService, name, resourceGroupID, keyType, publicKey) if err != nil { - return err + return "", err } t2e := time.Now() t3s := t2e subnetID, err := createSubnet(vpcService, name, resourceGroupID, vpcID, zone) if err != nil { - return err + return "", err } t3e := time.Now() t4s := t3e secGroupID, err := createSecurityGroup(vpcService, name, resourceGroupID, vpcID) if err != nil { - return err + return "", err } t4e := time.Now() t5s := t4e if err = createSecurityGroupRule(vpcService, secGroupID); err != nil { - return err + return "", err } t5e := time.Now() - group, _ := errgroup.WithContext(ctx) t6s := time.Now() - // One Component for WorkStealer, one for Dispatcher, and each per WorkerPool - poolCount := intCounter{} + if len(ir.Components) > 0 { + if err = createVMForComponents(ctx, vpcService, name, ir, resourceGroupID, zone, profile, imageID, namespace, vpcID, keyID, subnetID, secGroupID, opts); err != nil { + return "", err + } + } else { + instanceID, err = createVMForImage(vpcService, name, resourceGroupID, zone, profile, imageID, namespace, vpcID, keyID, subnetID, secGroupID, opts) + if err != nil { + return "", err + } + } + t6e := time.Now() + + if opts.Log.Verbose { + fmt.Fprintf(os.Stderr, "Setup done %s\n", util.RelTime(t1s, t6e)) + fmt.Fprintf(os.Stderr, " - VPC %s\n", util.RelTime(t1s, t1e)) + fmt.Fprintf(os.Stderr, " - SSH %s\n", util.RelTime(t2s, t2e)) + fmt.Fprintf(os.Stderr, " - Subnet %s\n", util.RelTime(t3s, t3e)) + fmt.Fprintf(os.Stderr, " - SecurityGroup %s\n", util.RelTime(t4s, t4e)) + fmt.Fprintf(os.Stderr, " - SecurityGroupRule %s\n", util.RelTime(t5s, t5e)) + fmt.Fprintf(os.Stderr, " - VMs %s\n", util.RelTime(t6s, t6e)) + } + return instanceID, nil +} + +func createVMForComponents(ctx context.Context, vpcService *vpcv1.VpcV1, name string, ir llir.LLIR, resourceGroupID string, zone string, profile string, imageID string, namespace string, vpcID string, keyID string, subnetID string, secGroupID string, opts llir.Options) error { + group, _ := errgroup.WithContext(ctx) + var verboseFlag string + for _, c := range ir.Components { instanceName := name + "-" + string(c.C()) - group.Go(func() error { - if c.C() == lunchpail.DispatcherComponent || c.C() == lunchpail.WorkStealerComponent { - instance, err := createInstance(vpcService, instanceName, ir, c, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts) - if err != nil { - return err - } + if opts.Log.Verbose { + fmt.Fprintf(os.Stderr, "Creating VM %s\n", instanceName) + } - //TODO VSI instances other than jumpbox or main pod should not have floatingIP. Remove below after testing - floatingIPID, err := createFloatingIP(vpcService, instanceName, resourceGroupID, zone) - if err != nil { - return err - } + componentB64, err := util.ToJsonB64(c) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return err + } - options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{ - ID: &floatingIPID, - InstanceID: instance.ID, - NetworkInterfaceID: instance.PrimaryNetworkInterface.ID, - } - _, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options) - if err != nil { - return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response) - } - } else if c.C() == lunchpail.WorkersComponent { - poolCount.inc() - workerCount := c.Workers() - poolName := instanceName + strconv.Itoa(poolCount.counter) //multiple worker pools, maybe - - //Compute number of VSIs to be provisioned and job parallelism for each VSI - parallelism, numInstances, err := computeParallelismAndInstanceCount(vpcService, profile, int32(workerCount)) - if err != nil { - return fmt.Errorf("failed to compute number of instances and job parallelism: %v", err) - } + llirB64, err := util.ToJsonB64(ir) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return err + } - for i := 0; i < numInstances; i++ { - workerName := poolName + "-" + strconv.Itoa(i) //multiple worker instances - c = c.SetWorkers(int(parallelism[i])) - instance, err := createInstance(vpcService, workerName, ir, c, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts) - if err != nil { - return err - } - - floatingIPID, err := createFloatingIP(vpcService, workerName, resourceGroupID, zone) - if err != nil { - return err - } - - options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{ - ID: &floatingIPID, - InstanceID: instance.ID, - NetworkInterfaceID: instance.PrimaryNetworkInterface.ID, - } - _, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options) - if err != nil { - return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response) - } - } + if opts.Log.Verbose { + verboseFlag = "--verbose" + } + + cc := &config.CloudConfig{ + RunCmd: []string{"curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o /minio-binaries/mc", + "chmod +x /minio-binaries/mc", + "export PATH=$PATH:/minio-binaries/", + "mc alias set myminio " + ir.Context.Queue.Endpoint + " " + ir.Context.Queue.AccessKey + " " + ir.Context.Queue.SecretKey, // setting mc config + "apt-get install jq -y", + "exec=$(mc stat myminio/" + ir.Context.Queue.Bucket + "/" + ir.Context.Run.AsFile(q.Blobs) + "/ --json | jq -r '.name')", + "mc get myminio/" + ir.Context.Queue.Bucket + "/" + ir.Context.Run.AsFile(q.Blobs) + "/$exec /lunchpail", //use mc client to download binary + "chmod +x /lunchpail", + "env HOME=/root /lunchpail component run-locally --component " + string(componentB64) + " --llir " + string(llirB64) + " " + verboseFlag}, + } + + //TODO: Compute number of VSIs to be provisioned and job parallelism for each VSI based on number of workers and workerpools + group.Go(func() error { + instance, err := createInstance(vpcService, instanceName, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts, cc) + if err != nil { + return err + } + + floatingIPID, err := createFloatingIP(vpcService, instanceName, resourceGroupID, zone) + if err != nil { + return err + } + options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{ + ID: &floatingIPID, + InstanceID: instance.ID, + NetworkInterfaceID: instance.PrimaryNetworkInterface.ID, + } + _, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options) + if err != nil { + return fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response) } return nil }) @@ -378,18 +393,39 @@ func createAndInitVM(ctx context.Context, vpcService *vpcv1.VpcV1, name string, if err := group.Wait(); err != nil { return err } - t6e := time.Now() - - fmt.Printf("Setup done %s\n", util.RelTime(t1s, t6e)) - fmt.Printf(" - VPC %s\n", util.RelTime(t1s, t1e)) - fmt.Printf(" - SSH %s\n", util.RelTime(t2s, t2e)) - fmt.Printf(" - Subnet %s\n", util.RelTime(t3s, t3e)) - fmt.Printf(" - SecurityGroup %s\n", util.RelTime(t4s, t4e)) - fmt.Printf(" - SecurityGroupRule %s\n", util.RelTime(t5s, t5e)) - fmt.Printf(" - VMs %s\n", util.RelTime(t6s, t6e)) return nil } +func createVMForImage(vpcService *vpcv1.VpcV1, name string, resourceGroupID string, zone string, profile string, imageID string, namespace string, vpcID string, keyID string, subnetID string, secGroupID string, opts llir.Options) (string, error) { + if opts.Log.Verbose { + fmt.Printf("Creating VM %s for custom image creations\n", name) + } + + cc := &config.CloudConfig{ + RunCmd: []string{}, + } + instance, err := createInstance(vpcService, name, resourceGroupID, vpcID, keyID, zone, profile, subnetID, secGroupID, imageID, namespace, opts, cc) + if err != nil { + return "", err + } + + floatingIPID, err := createFloatingIP(vpcService, name, resourceGroupID, zone) + if err != nil { + return "", err + } + + options := &vpcv1.AddInstanceNetworkInterfaceFloatingIPOptions{ + ID: &floatingIPID, + InstanceID: instance.ID, + NetworkInterfaceID: instance.PrimaryNetworkInterface.ID, + } + _, response, err := vpcService.AddInstanceNetworkInterfaceFloatingIP(options) + if err != nil { + return "", fmt.Errorf("failed to add floating IP to network interface: %v and the response is: %s", err, response) + } + return *instance.ID, err +} + func (backend Backend) SetAction(ctx context.Context, opts llir.Options, ir llir.LLIR, action Action) error { runname := ir.RunName() @@ -406,7 +442,7 @@ func (backend Backend) SetAction(ctx context.Context, opts llir.Options, ir llir } zone = randomZone } - if err := createAndInitVM(ctx, backend.vpcService, runname, ir, backend.config.ResourceGroup.GUID, backend.sshKeyType, backend.sshPublicKey, zone, opts.Profile, opts.ImageID, backend.namespace, opts); err != nil { + if _, err := createResources(ctx, backend.vpcService, runname, ir, backend.config.ResourceGroup.GUID, backend.sshKeyType, backend.sshPublicKey, zone, opts.Profile, opts.ImageID, backend.namespace, opts); err != nil { return err } } diff --git a/pkg/be/ibmcloud/image.go b/pkg/be/ibmcloud/image.go new file mode 100644 index 00000000..2441e6bf --- /dev/null +++ b/pkg/be/ibmcloud/image.go @@ -0,0 +1,36 @@ +package ibmcloud + +import ( + "context" + + "lunchpail.io/pkg/ir/llir" +) + +func (backend Backend) CreateImage(ctx context.Context, ir llir.LLIR, opts llir.Options, destroy bool) (string, error) { + runname := ir.RunName() + + zone := opts.Zone //command line zone value + if zone == "" { //random zone value using config + randomZone, err := getRandomizedZone(backend.config, backend.vpcService) //Todo: spread among random zones with a subnet in each zone + if err != nil { + return "", err + } + zone = randomZone + } + instanceID, err := createResources(ctx, backend.vpcService, runname, ir, backend.config.ResourceGroup.GUID, backend.sshKeyType, backend.sshPublicKey, zone, opts.Profile, opts.ImageID, backend.namespace, opts) + if err != nil { + return "", err + } + + imageID, err := createImage(backend.vpcService, runname, backend.config.ResourceGroup.GUID, instanceID) + if err != nil { + return "", err + } + + if destroy { + if err := stopOrDeleteVM(backend.vpcService, runname, backend.config.ResourceGroup.GUID, true); err != nil { + return "", err + } + } + return imageID, nil +} diff --git a/pkg/be/ibmcloud/queue.go b/pkg/be/ibmcloud/queue.go index d8b58cd5..3ebff27a 100644 --- a/pkg/be/ibmcloud/queue.go +++ b/pkg/be/ibmcloud/queue.go @@ -3,13 +3,34 @@ package ibmcloud import ( "context" "fmt" + "os" "lunchpail.io/pkg/build" + q "lunchpail.io/pkg/client/queue" "lunchpail.io/pkg/ir/queue" ) // Queue properties for a given run, plus ensure access to the endpoint from this client -func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { - err = fmt.Errorf("Unsupported operation: 'AccessQueue'") +func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, rclone string, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { + endpoint, accessKeyID, secretAccessKey, bucket, err = backend.queue(rclone) + if opts.Verbose { + fmt.Fprintf(os.Stderr, "Accessing Queue with endpoint %s and bucket %s\n", endpoint, bucket) + } + stop = func() {} + return +} + +func (backend Backend) queue(rclone string) (endpoint, accessKeyID, secretAccessKey, bucket string, err error) { + var spec queue.Spec + if rclone != "" { + spec, err = q.AccessQueue(rclone) + if err != nil { + return + } + } + endpoint = spec.Endpoint + accessKeyID = spec.AccessKey + secretAccessKey = spec.SecretKey + bucket = spec.Bucket return } diff --git a/pkg/be/kubernetes/image.go b/pkg/be/kubernetes/image.go new file mode 100644 index 00000000..22ff1049 --- /dev/null +++ b/pkg/be/kubernetes/image.go @@ -0,0 +1,12 @@ +package kubernetes + +import ( + "context" + "fmt" + + "lunchpail.io/pkg/ir/llir" +) + +func (backend Backend) CreateImage(ctx context.Context, ir llir.LLIR, opts llir.Options, destroy bool) (string, error) { + return "", fmt.Errorf("Unsupported operation: 'CreateImage'") +} diff --git a/pkg/be/kubernetes/queue.go b/pkg/be/kubernetes/queue.go index 567b0c91..9c77851c 100644 --- a/pkg/be/kubernetes/queue.go +++ b/pkg/be/kubernetes/queue.go @@ -16,7 +16,7 @@ import ( ) // Queue properties for a given run, plus ensure access to the endpoint from this client -func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { +func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, rclone string, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { endpoint, accessKeyID, secretAccessKey, bucket, err = backend.queue(ctx, run) if err != nil { return diff --git a/pkg/be/local/image.go b/pkg/be/local/image.go new file mode 100644 index 00000000..4737ff34 --- /dev/null +++ b/pkg/be/local/image.go @@ -0,0 +1,12 @@ +package local + +import ( + "context" + "fmt" + + "lunchpail.io/pkg/ir/llir" +) + +func (backend Backend) CreateImage(ctx context.Context, ir llir.LLIR, opts llir.Options, destroy bool) (string, error) { + return "", fmt.Errorf("Unsupported operation: 'CreateImage'") +} diff --git a/pkg/be/local/queue.go b/pkg/be/local/queue.go index 6dd2bbe0..6ef023b9 100644 --- a/pkg/be/local/queue.go +++ b/pkg/be/local/queue.go @@ -14,7 +14,7 @@ import ( ) // Queue properties for a given run, plus ensure access to the endpoint from this client -func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { +func (backend Backend) AccessQueue(ctx context.Context, run queue.RunContext, rclone string, opts build.LogOptions) (endpoint, accessKeyID, secretAccessKey, bucket string, stop func(), err error) { endpoint, accessKeyID, secretAccessKey, bucket, err = backend.queue(ctx, run) stop = func() {} return diff --git a/pkg/be/local/shell/job.go b/pkg/be/local/shell/job.go index 4791f70f..7f1136b3 100644 --- a/pkg/be/local/shell/job.go +++ b/pkg/be/local/shell/job.go @@ -11,16 +11,15 @@ import ( ) // Run the component as a "job", with multiple workers -func SpawnJob(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir string, opts build.LogOptions) error { +func SpawnJob(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, opts build.LogOptions) error { if c.InitialWorkers < 1 { return fmt.Errorf("Invalid worker count %d for %v", c.InitialWorkers, c.C()) } - group, jobCtx := errgroup.WithContext(ctx) for workerIdx := range c.InitialWorkers { group.Go(func() error { - return Spawn(jobCtx, c.WithInstanceName(fmt.Sprintf("w%d", workerIdx)), ir, logdir, opts) + return Spawn(jobCtx, c.WithInstanceName(fmt.Sprintf("w%d", workerIdx)), ir, opts) }) } diff --git a/pkg/be/local/shell/spawn.go b/pkg/be/local/shell/spawn.go index 794c2554..311fa9fa 100644 --- a/pkg/be/local/shell/spawn.go +++ b/pkg/be/local/shell/spawn.go @@ -17,7 +17,7 @@ import ( "lunchpail.io/pkg/ir/llir" ) -func Spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir string, opts build.LogOptions) error { +func Spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, opts build.LogOptions) error { pidfile, err := files.Pidfile(ir.Context.Run, c.InstanceName, c.C(), true) if err != nil { return err @@ -29,6 +29,12 @@ func Spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir stri } defer os.RemoveAll(workdir) + // This is where component logs will go + logdir, err := files.LogDir(ir.Context.Run, true) + if err != nil { + return err + } + // tee command output to the logdir instance := strings.Replace(strings.Replace(c.InstanceName, ir.RunName(), "", 1), "--", "-", 1) logfile := files.LogFileForComponent(c.C()) diff --git a/pkg/be/local/spawn.go b/pkg/be/local/spawn.go index 97ba604e..b2efb679 100644 --- a/pkg/be/local/spawn.go +++ b/pkg/be/local/spawn.go @@ -8,10 +8,10 @@ import ( "lunchpail.io/pkg/ir/llir" ) -func (backend Backend) spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir string, opts build.LogOptions) error { +func (backend Backend) spawn(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, opts build.LogOptions) error { if c.RunAsJob { - return shell.SpawnJob(ctx, c, ir, logdir, opts) + return shell.SpawnJob(ctx, c, ir, opts) } else { - return shell.Spawn(ctx, c, ir, logdir, opts) + return shell.Spawn(ctx, c, ir, opts) } } diff --git a/pkg/be/local/up.go b/pkg/be/local/up.go index ece362a2..e6290d10 100644 --- a/pkg/be/local/up.go +++ b/pkg/be/local/up.go @@ -26,12 +26,6 @@ func (backend Backend) Up(octx context.Context, ir llir.LLIR, opts llir.Options, } } - // This is where component logs will go - logdir, err := files.LogDir(ir.Context.Run, true) - if err != nil { - return err - } - // Write a pid file to indicate the pid of this process if pidfile, err := files.PidfileForMain(ir.Context.Run); err != nil { return err @@ -47,7 +41,7 @@ func (backend Backend) Up(octx context.Context, ir llir.LLIR, opts llir.Options, // Launch each of the components group, ctx := errgroup.WithContext(octx) for _, c := range ir.Components { - group.Go(func() error { return backend.spawn(ctx, c, ir, logdir, *opts.Log) }) + group.Go(func() error { return backend.spawn(ctx, c, ir, *opts.Log) }) } // Indicate that we are off to the races diff --git a/pkg/boot/alldone.go b/pkg/boot/alldone.go index 8bf09ce7..c7a216db 100644 --- a/pkg/boot/alldone.go +++ b/pkg/boot/alldone.go @@ -12,8 +12,8 @@ import ( s3 "lunchpail.io/pkg/runtime/queue" ) -func waitForAllDone(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, run, opts) +func waitForAllDone(ctx context.Context, backend be.Backend, run queue.RunContext, rclone string, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, run, rclone, opts) if err != nil { if strings.Contains(err.Error(), "Connection closed") { // already gone diff --git a/pkg/boot/failures.go b/pkg/boot/failures.go index bb364826..036994b4 100644 --- a/pkg/boot/failures.go +++ b/pkg/boot/failures.go @@ -13,8 +13,8 @@ import ( s3 "lunchpail.io/pkg/runtime/queue" ) -func lookForTaskFailures(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, run, opts) +func lookForTaskFailures(ctx context.Context, backend be.Backend, run queue.RunContext, rclone string, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, run, rclone, opts) if err != nil { return err } diff --git a/pkg/boot/io.go b/pkg/boot/io.go index 6f854a45..10aa4d2f 100644 --- a/pkg/boot/io.go +++ b/pkg/boot/io.go @@ -18,8 +18,8 @@ import ( ) // Behave like `cat inputs | ... > outputs` -func catAndRedirect(ctx context.Context, inputs []string, backend be.Backend, ir llir.LLIR, alldone <-chan struct{}, noRedirect bool, redirectTo string, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, ir.Context.Run, opts) +func catAndRedirect(ctx context.Context, inputs []string, backend be.Backend, ir llir.LLIR, alldone <-chan struct{}, noRedirect bool, redirectTo string, rclone string, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, ir.Context.Run, rclone, opts) if err != nil { return err } @@ -91,8 +91,8 @@ func catAndRedirect(ctx context.Context, inputs []string, backend be.Backend, ir } // For Step > 0, we will need to simulate that a dispatch is done -func fakeDispatch(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, run, opts) +func fakeDispatch(ctx context.Context, backend be.Backend, run queue.RunContext, rclone string, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, run, rclone, opts) if err != nil { return err } diff --git a/pkg/boot/up.go b/pkg/boot/up.go index a0991216..1b7ee74c 100644 --- a/pkg/boot/up.go +++ b/pkg/boot/up.go @@ -6,10 +6,12 @@ import ( "context" "fmt" "os" + "os/exec" "os/signal" "strings" "lunchpail.io/pkg/be" + "lunchpail.io/pkg/be/target" "lunchpail.io/pkg/build" "lunchpail.io/pkg/fe" "lunchpail.io/pkg/ir/hlir" @@ -151,7 +153,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption case <-cancellable.Done(): case ctx := <-isRunning6: if ctx.Run.Step == 0 || isFinalStep(ctx) { - errorFromAllDone = waitForAllDone(cancellable, backend, ctx.Run, *opts.BuildOptions.Log) + errorFromAllDone = waitForAllDone(cancellable, backend, ctx.Run, opts.BuildOptions.Queue, *opts.BuildOptions.Log) if errorFromAllDone != nil && strings.Contains(errorFromAllDone.Error(), "connection refused") { // Then Minio went away on its own. That's probably ok. errorFromAllDone = nil @@ -174,7 +176,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption } defer func() { redirectDone <- struct{}{} }() - if err := catAndRedirect(cancellable, opts.Inputs, backend, ir, alldone, opts.NoRedirect, opts.RedirectTo, *opts.BuildOptions.Log); err != nil { + if err := catAndRedirect(cancellable, opts.Inputs, backend, ir, alldone, opts.NoRedirect, opts.RedirectTo, opts.BuildOptions.Queue, *opts.BuildOptions.Log); err != nil { errorFromIo = err cancel() } @@ -215,13 +217,14 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption case <-cancellable.Done(): case <-isRunning6: } - if err := lookForTaskFailures(cancellable, backend, ir.Context.Run, *opts.BuildOptions.Log); err != nil { + if err := lookForTaskFailures(cancellable, backend, ir.Context.Run, opts.BuildOptions.Queue, *opts.BuildOptions.Log); err != nil { errorFromTask = err // fail fast? cancel() } }() //inject executable into s3 + fmt.Fprintln(os.Stderr, "opts.Executable "+opts.Executable) if opts.Executable != "" { go func() { // wait for the run to be ready for us to enqueue @@ -230,10 +233,32 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption case <-isRunning6: } - if err := s3.UploadFiles(cancellable, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, *opts.BuildOptions.Log); err != nil { - fmt.Fprintln(os.Stderr, err) + if opts.BuildOptions.Target.Platform == target.IBMCloud { + //rebuilding self to upload linux-amd64 executable + cmd := exec.Command("/bin/sh", "-c", opts.Executable+" build -A -o "+opts.Executable) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + fmt.Fprintln(os.Stderr, err) + } + if err := s3.UploadFiles(cancellable, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable + "-linux-amd64", TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, opts.BuildOptions.Queue, *opts.BuildOptions.Log); err != nil { + fmt.Fprintln(os.Stderr, err) + } + } else { + if err := s3.UploadFiles(cancellable, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, opts.BuildOptions.Queue, *opts.BuildOptions.Log); err != nil { + fmt.Fprintln(os.Stderr, err) + } } }() + + /* if opts.BuildOptions.Target.Platform == target.IBMCloud { + //use the uploaded executable to create an IBM Cloud custom image for VPC using a VSI boot volume + imageID, err := backend.CreateImage(cancellable, ir, opts.BuildOptions, true) // destroys resources after image creation TODO: reuse resources on Up + if err != nil { + return err + } + opts.BuildOptions.ImageID = imageID + } */ } defer cancel() diff --git a/pkg/client/queue/rclone.go b/pkg/client/queue/rclone.go new file mode 100644 index 00000000..f7f8e739 --- /dev/null +++ b/pkg/client/queue/rclone.go @@ -0,0 +1,109 @@ +package queue + +import ( + "fmt" + "os" + "regexp" + "strconv" + "strings" + + rcloneConfig "github.com/rclone/rclone/fs/config" + rcloneConfigFile "github.com/rclone/rclone/fs/config/configfile" + + "lunchpail.io/pkg/ir/queue" +) + +func AccessQueue(flag string) (queue.Spec, error) { + + isRclone, spec, err := parseFlagAsRclone(flag) + + if err != nil { + return queue.Spec{}, err + } else if flag != "" && !isRclone { + return queue.Spec{}, fmt.Errorf("Unsupported scheme for queue: '%s'", flag) + } + + return spec, nil +} + +func specFromRcloneRemoteName(remoteName, bucket string) (bool, queue.Spec, error) { + spec := queue.Spec{ + Auto: true, + Bucket: bucket, + } + + if os.Getenv("RCLONE_CONFIG") != "" { + if err := rcloneConfig.SetConfigPath(os.Getenv("RCLONE_CONFIG")); err != nil { + return false, queue.Spec{}, err + } + } + rcloneConfigFile.Install() // otherwise, DumpRcRemote() yields an empty map + config := rcloneConfig.DumpRcRemote(remoteName) + + if maybe, ok := config["endpoint"]; !ok { + return false, queue.Spec{}, fmt.Errorf("Rclone config '%s' is missing endpoint %v || %v", remoteName, config, rcloneConfig.LoadedData()) + } else if s, ok := maybe.(string); !ok { + return false, queue.Spec{}, fmt.Errorf("Rclone config '%s' has invalid endpoint value: '%s'", remoteName, maybe) + } else { + spec.Endpoint = s + words := strings.Split(spec.Endpoint, ":") + if len(words) == 3 { + p, err := strconv.Atoi(words[2]) + if err != nil { + return false, queue.Spec{}, err + } + spec.Port = p + } + if !isInternalS3(s) { + spec.Auto = false + } + } + + if maybe, ok := config["access_key_id"]; !ok { + return false, queue.Spec{}, fmt.Errorf("Rclone config '%s' is missing access_key_id", remoteName) + } else if s, ok := maybe.(string); !ok { + return false, queue.Spec{}, fmt.Errorf("Rclone config '%s' has invalid access_key_id value: '%s'", remoteName, maybe) + } else { + spec.AccessKey = s + } + + if maybe, ok := config["secret_access_key"]; !ok { + return false, queue.Spec{}, fmt.Errorf("Rclone config '%s' is missing secret_access_key", remoteName) + } else if s, ok := maybe.(string); !ok { + return false, queue.Spec{}, fmt.Errorf("Rclone config '%s' has invalid secret_access_key value: '%s'", remoteName, maybe) + } else { + spec.SecretKey = s + } + + if spec.Endpoint == "" || spec.Endpoint == "$TEST_QUEUE_ENDPOINT" { + spec.Auto = true + } + + if strings.Contains(spec.AccessKey, "$TEST_QUEUE_ACCESSKEY") { + spec.AccessKey = strings.Replace(spec.AccessKey, "$TEST_QUEUE_ACCESSKEY", "lunchpail", -1) + } + + if strings.Contains(spec.SecretKey, "$TEST_QUEUE_SECRETKEY") { + spec.SecretKey = strings.Replace(spec.SecretKey, "$TEST_QUEUE_SECRETKEY", "lunchpail", -1) + } + + return true, spec, nil +} + +func parseFlagAsRclone(flag string) (bool, queue.Spec, error) { + rclonePattern := regexp.MustCompile("^rclone://([^/]+)/(.+)$") + if match := rclonePattern.FindStringSubmatch(flag); len(match) == 3 { + return specFromRcloneRemoteName(match[1], match[2]) + } else if strings.HasPrefix(flag, "rclone:") { + return false, queue.Spec{}, fmt.Errorf("Invalid --queue option. Must be of the form 'rclone://configname/bucketname'") + } + + return false, queue.Spec{}, nil +} + +// Follow convention for internalS3 name in charts/workstealer/templates/s3 below. +// Checks if hostname ends with the same suffix to determine if internalS3. +func isInternalS3(endpoint string) bool { + internalS3Suffix := "-minio" + return strings.HasSuffix(strings.Split(endpoint, ".")[0], internalS3Suffix) +} diff --git a/pkg/observe/qstat/stream.go b/pkg/observe/qstat/stream.go index e4ee5347..486d811c 100644 --- a/pkg/observe/qstat/stream.go +++ b/pkg/observe/qstat/stream.go @@ -24,7 +24,7 @@ func stream(ctx context.Context, runnameIn string, backend be.Backend, opts Opti fmt.Fprintln(os.Stderr, "Tracking run", runname) } - client, err := s3.NewS3ClientForRun(ctx, backend, queue.RunContext{RunName: runname, Step: opts.Step}, opts.LogOptions) + client, err := s3.NewS3ClientForRun(ctx, backend, queue.RunContext{RunName: runname, Step: opts.Step}, opts.Rclone, opts.LogOptions) if err != nil { return client.RunContext, nil, nil, nil, err } diff --git a/pkg/observe/qstat/ui.go b/pkg/observe/qstat/ui.go index 0f178010..b0b1ae45 100644 --- a/pkg/observe/qstat/ui.go +++ b/pkg/observe/qstat/ui.go @@ -24,6 +24,9 @@ type Options struct { // Step Step int + + //rclone path + Rclone string } func UI(ctx context.Context, runnameIn string, backend be.Backend, opts Options) error { diff --git a/pkg/runtime/builtins/cat.go b/pkg/runtime/builtins/cat.go index 50a8a413..8e21544d 100644 --- a/pkg/runtime/builtins/cat.go +++ b/pkg/runtime/builtins/cat.go @@ -26,8 +26,8 @@ func Cat(ctx context.Context, client s3.S3Client, run queue.RunContext, inputs [ return nil } -func CatClient(ctx context.Context, backend be.Backend, run queue.RunContext, inputs []string, opts build.LogOptions) error { - client, err := s3.NewS3ClientForRun(ctx, backend, run, opts) +func CatClient(ctx context.Context, backend be.Backend, run queue.RunContext, inputs []string, rclone string, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, run, rclone, opts) if err != nil { return err } diff --git a/pkg/runtime/queue/client.go b/pkg/runtime/queue/client.go index 2bcf16e4..44abde9e 100644 --- a/pkg/runtime/queue/client.go +++ b/pkg/runtime/queue/client.go @@ -70,8 +70,8 @@ func NewS3ClientFromOptions(ctx context.Context, opts S3ClientOptions) (S3Client } // Client for a given run in the given backend -func NewS3ClientForRun(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) (S3ClientStop, error) { - endpoint, accessKeyId, secretAccessKey, bucket, stop, err := backend.AccessQueue(ctx, run, opts) +func NewS3ClientForRun(ctx context.Context, backend be.Backend, run queue.RunContext, rclone string, opts build.LogOptions) (S3ClientStop, error) { + endpoint, accessKeyId, secretAccessKey, bucket, stop, err := backend.AccessQueue(ctx, run, rclone, opts) if err != nil { return S3ClientStop{}, err } diff --git a/pkg/runtime/queue/drain.go b/pkg/runtime/queue/drain.go index ccb6d706..3192abe9 100644 --- a/pkg/runtime/queue/drain.go +++ b/pkg/runtime/queue/drain.go @@ -13,8 +13,8 @@ import ( ) // Drain the output tasks, allowing graceful termination -func Drain(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { - c, err := NewS3ClientForRun(ctx, backend, run, opts) +func Drain(ctx context.Context, backend be.Backend, run queue.RunContext, rclone string, opts build.LogOptions) error { + c, err := NewS3ClientForRun(ctx, backend, run, rclone, opts) if err != nil { return err } diff --git a/pkg/runtime/queue/ls.go b/pkg/runtime/queue/ls.go index 3eb27e4b..4e6b42af 100644 --- a/pkg/runtime/queue/ls.go +++ b/pkg/runtime/queue/ls.go @@ -10,8 +10,8 @@ import ( "lunchpail.io/pkg/ir/queue" ) -func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path string, opts build.LogOptions) (<-chan string, <-chan error, error) { - c, err := NewS3ClientForRun(ctx, backend, run, opts) +func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path string, rclone string, opts build.LogOptions) (<-chan string, <-chan error, error) { + c, err := NewS3ClientForRun(ctx, backend, run, rclone, opts) if err != nil { return nil, nil, err } diff --git a/pkg/runtime/queue/qcat.go b/pkg/runtime/queue/qcat.go index be26a47e..1c579181 100644 --- a/pkg/runtime/queue/qcat.go +++ b/pkg/runtime/queue/qcat.go @@ -10,7 +10,7 @@ import ( ) func Qcat(ctx context.Context, backend be.Backend, run queue.RunContext, path string, opts build.LogOptions) error { - c, err := NewS3ClientForRun(ctx, backend, run, opts) + c, err := NewS3ClientForRun(ctx, backend, run, "", opts) if err != nil { return err } diff --git a/pkg/runtime/queue/upload.go b/pkg/runtime/queue/upload.go index 5d6d84ff..1efdda34 100644 --- a/pkg/runtime/queue/upload.go +++ b/pkg/runtime/queue/upload.go @@ -15,14 +15,13 @@ import ( "lunchpail.io/pkg/runtime/queue/upload" ) -func UploadFiles(ctx context.Context, backend be.Backend, run queue.RunContext, specs []upload.Upload, opts build.LogOptions) error { - s3, err := NewS3ClientForRun(ctx, backend, run, opts) +func UploadFiles(ctx context.Context, backend be.Backend, run queue.RunContext, specs []upload.Upload, rclone string, opts build.LogOptions) error { + s3, err := NewS3ClientForRun(ctx, backend, run, rclone, opts) if err != nil { return err } defer s3.Stop() run.Bucket = s3.RunContext.Bucket // TODO - for _, spec := range specs { bucket := spec.Bucket if bucket == "" { @@ -36,9 +35,9 @@ func UploadFiles(ctx context.Context, backend be.Backend, run queue.RunContext, return err } - if opts.Verbose { - fmt.Fprintf(os.Stderr, "Uploading files from local path=%s to s3 bucket=%s targetDir='%s'\n", spec.LocalPath, bucket, spec.TargetDir) - } + //if opts.Verbose { + fmt.Fprintf(os.Stderr, "Uploading files from local path=%s to s3 bucket=%s targetDir='%s'\n", spec.LocalPath, bucket, spec.TargetDir) + //} info, err := os.Stat(spec.LocalPath) if err != nil { return err diff --git a/pkg/runtime/run-locally.go b/pkg/runtime/run-locally.go new file mode 100644 index 00000000..5f71d214 --- /dev/null +++ b/pkg/runtime/run-locally.go @@ -0,0 +1,49 @@ +package runtime + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "os" + + "lunchpail.io/pkg/be/local/shell" + "lunchpail.io/pkg/build" + "lunchpail.io/pkg/ir/llir" +) + +func RunLocally(ctx context.Context, component string, lowerir string, opts build.LogOptions) error { + var c llir.ShellComponent + var ir llir.LLIR + var componentByte []byte + var lowerirByte []byte + var err error + + if componentByte, err = base64.StdEncoding.DecodeString(component); err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + if err := json.Unmarshal(componentByte, &c); err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + + if lowerirByte, err = base64.StdEncoding.DecodeString(lowerir); err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + if err := json.Unmarshal(lowerirByte, &ir); err != nil { + if opts.Verbose { + fmt.Fprintf(os.Stderr, "%v\n", err) + } + return err + } + + return shell.Spawn(ctx, c, ir, opts) +}