Skip to content

Commit

Permalink
Add JobObject support in Windows.
Browse files Browse the repository at this point in the history
Subprocesses started by reproxy in Windows are registered with a JobObject, which is
configured to terminate all remaining processes when the JobObject is termindated (which
happens on reproxy shutdown).

Bug: b/364360512
Test: manual runs don't cause issues, reproducing actual issue is difficult
Change-Id: I97ff4560646b9b3a50f2719a1eebb5b2ae1391a7
GitOrigin-RevId: 3e3af3bee4cdfd6d4e1e55131e3b40f74cdd2764
  • Loading branch information
MikeS-rec authored and copybara-github committed Sep 6, 2024
1 parent 16a9682 commit 1157864
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 35 deletions.
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ use_repo(
"com_github_gosuri_uilive",
"com_github_hectane_go_acl",
"com_github_karrick_godirwalk",
"com_github_kolesnikovae_go_winjob",
"com_github_microsoft_go_winio",
"com_github_pkg_xattr",
"com_github_shirou_gopsutil_v3",
Expand Down
6 changes: 6 additions & 0 deletions cmd/reproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ func main() {

st := filemetadata.NewSingleFlightCache()

subprocCleanup, err := subprocess.Setup()
if err != nil {
log.Exitf("Failed in subprocess setup: %v", err)
}
defer subprocCleanup()

exec := &subprocess.SystemExecutor{}
resMgr := localresources.NewFractionalDefaultManager(*localResourceFraction)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/gosuri/uilive v0.0.4
github.com/hectane/go-acl v0.0.0-20230122075934-ca0b05cb1adb
github.com/karrick/godirwalk v1.17.0
github.com/kolesnikovae/go-winjob v1.0.0
github.com/pkg/xattr v0.4.4
github.com/shirou/gopsutil/v3 v3.24.4
github.com/vardius/progress-go v0.0.0-20221030221608-f948426036a9
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,8 @@ github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0N
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kolesnikovae/go-winjob v1.0.0 h1:OKEtCHB3sYNAiqNwGDhf08Y6luM7C8mP+42rp1N6SeE=
github.com/kolesnikovae/go-winjob v1.0.0/go.mod h1:k0joOLP3/NBrRmDQjPV2+oN1TPmEWt6arTNtFjVeQuM=
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
7 changes: 6 additions & 1 deletion internal/pkg/subprocess/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ go_library(
"@com_github_bazelbuild_remote_apis_sdks//go/pkg/command",
"@com_github_bazelbuild_remote_apis_sdks//go/pkg/outerr",
"@com_github_golang_glog//:glog",
],
] + select({
"@io_bazel_rules_go//go/platform:windows": [
"@com_github_kolesnikovae_go_winjob//:go-winjob",
],
"//conditions:default": [],
}),
)

go_test(
Expand Down
31 changes: 0 additions & 31 deletions internal/pkg/subprocess/subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,37 +76,6 @@ func (SystemExecutor) ExecuteWithOutErr(ctx context.Context, cmd *command.Comman
return err
}

// ExecuteInBackground executes the command in the background. Command result is written to the
// passed channel.
func (SystemExecutor) ExecuteInBackground(ctx context.Context, cmd *command.Command, oe outerr.OutErr, ch chan *command.Result) error {
cmdCtx, stdout, stderr, err := setupCommand(ctx, cmd)
if err != nil {
return err
}
if err = cmdCtx.Start(); err != nil {
log.V(2).Infof("Starting command %v >> err=%v", cmd.Args, err)
return err
}
go func() {
err := cmdCtx.Wait()
if err != nil {
log.V(2).Infof("Executed command %v\n >> err=%v", cmd.Args, err)
}
oe.WriteOut([]byte(stdout.String()))
oe.WriteErr([]byte(stderr.String()))
exitCode := 0
if exitErr, ok := err.(*exec.ExitError); ok && exitErr != nil {
exitCode = exitErr.ExitCode()
}
res := command.NewResultFromExitCode(exitCode)
if exitCode == 0 && err != nil {
res = command.NewLocalErrorResult(err)
}
ch <- res
}()
return nil
}

func setupCommand(ctx context.Context, cmd *command.Command) (*exec.Cmd, *strings.Builder, *strings.Builder, error) {
if len(cmd.Args) < 1 {
return nil, nil, nil, fmt.Errorf("command must have more than 1 argument")
Expand Down
14 changes: 12 additions & 2 deletions internal/pkg/subprocess/subprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ func TestExecuteWithOutErr(t *testing.T) {
}

func TestExecuteInBackground(t *testing.T) {
cleanup, err := Setup()
if err != nil {
t.Fatalf("Failed subprocess setup: %v", err)
}
defer cleanup()
oe := outerr.NewRecordingOutErr()
want := "Hello"
cmd := &command.Command{Args: []string{"bash", "-c", "sleep 1 && echo " + want}}
ch := make(chan *command.Result)
err := SystemExecutor{}.ExecuteInBackground(context.Background(), cmd, oe, ch)
err = SystemExecutor{}.ExecuteInBackground(context.Background(), cmd, oe, ch)
if err != nil {
t.Errorf("ExecuteInBackground(%v) failed with error: %v", cmd, err)
}
Expand All @@ -72,11 +77,16 @@ func TestExecuteInBackground(t *testing.T) {
}

func TestExecuteInBackgroundCancellation(t *testing.T) {
cleanup, err := Setup()
if err != nil {
t.Fatalf("Failed subprocess setup: %v", err)
}
defer cleanup()
oe := outerr.NewRecordingOutErr()
cmd := &command.Command{Args: []string{"bash", "-c", "sleep 2 && echo Hello"}}
ch := make(chan *command.Result)
ctx, cancel := context.WithCancel(context.Background())
err := SystemExecutor{}.ExecuteInBackground(ctx, cmd, oe, ch)
err = SystemExecutor{}.ExecuteInBackground(ctx, cmd, oe, ch)
if err != nil {
t.Errorf("ExecuteInBackground(%v) failed with error: %v", cmd, err)
}
Expand Down
45 changes: 45 additions & 0 deletions internal/pkg/subprocess/subprocess_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,24 @@
package subprocess

import (
"context"
"os"
"os/exec"
"syscall"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/command"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/outerr"

log "github.com/golang/glog"
)

// Setup any necessary one-time things for subprocess management. On linux/mac,
// this is a no-op and returns a no-op cleanup function.
func Setup() (func(), error) {
// No-op on unix
return func() {}, nil
}

// Exists returns true if a pid is assigned to a process that is actively running.
// Based on comment from: https://github.com/golang/go/issues/34396
func Exists(pid int) (bool, error) {
Expand All @@ -38,3 +52,34 @@ func Exists(pid int) (bool, error) {
}
return false, nil
}

// ExecuteInBackground executes the command in the background. Command result is written to the
// passed channel.
func (SystemExecutor) ExecuteInBackground(ctx context.Context, cmd *command.Command, oe outerr.OutErr, ch chan *command.Result) error {
cmdCtx, stdout, stderr, err := setupCommand(ctx, cmd)
if err != nil {
return err
}
if err = cmdCtx.Start(); err != nil {
log.V(2).Infof("Starting command %v >> err=%v", cmd.Args, err)
return err
}
go func() {
err := cmdCtx.Wait()
if err != nil {
log.V(2).Infof("Executed command %v\n >> err=%v", cmd.Args, err)
}
oe.WriteOut([]byte(stdout.String()))
oe.WriteErr([]byte(stderr.String()))
exitCode := 0
if exitErr, ok := err.(*exec.ExitError); ok && exitErr != nil {
exitCode = exitErr.ExitCode()
}
res := command.NewResultFromExitCode(exitCode)
if exitCode == 0 && err != nil {
res = command.NewLocalErrorResult(err)
}
ch <- res
}()
return nil
}
76 changes: 75 additions & 1 deletion internal/pkg/subprocess/subprocess_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,41 @@

package subprocess

import "os"
import (
"context"
"os"
"os/exec"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/command"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/outerr"

log "github.com/golang/glog"

winjob "github.com/kolesnikovae/go-winjob"
)

var (
jobObj *winjob.JobObject
)

// Subprocess setup on Windows creates a JobObject for keeping track of child processes
// created by reproxy. A cleanup function to close the job object is returned.
func Setup() (func(), error) {
var err error
// Create the job object configured to kill any remaining unfinished processes
// when it is closed.
jobObj, err = winjob.Create("", winjob.WithKillOnJobClose())
if err != nil {
return nil, err
}
// Cleanup function closes the job object, which should terminate all remaining processes
// that have not yet finished.
return func() {
if err := jobObj.Close(); err != nil {
log.Errorf("Failure closing job object: %v", err)
}
}, nil
}

// Exists returns true if a pid is assigned to a process that is actively running.
// In the windows case, a call to FindProcess should be sufficient, as it will return an
Expand All @@ -29,3 +63,43 @@ func Exists(pid int) (bool, error) {
p.Release()
return true, nil
}

// ExecuteInBackground executes the command in the background. Command result is written to the
// passed channel. For Windows, the child process will be added to a Job for tracking and guaranteed
// shutdown on reproxy close.
func (SystemExecutor) ExecuteInBackground(ctx context.Context, cmd *command.Command, oe outerr.OutErr, ch chan *command.Result) error {
cmdCtx, stdout, stderr, err := setupCommand(ctx, cmd)
if err != nil {
return err
}

if err = cmdCtx.Start(); err != nil {
log.V(2).Infof("Starting command %v >> err=%v", cmd.Args, err)
return err
}

if jobObj != nil {
if err = jobObj.Assign(cmdCtx.Process); err != nil {
log.Warningf("Failed to assign process %v (%v) to Job Object", cmd.Args[0], cmdCtx.Process)
}
}

go func() {
err := cmdCtx.Wait()
if err != nil {
log.V(2).Infof("Executed command %v\n >> err=%v", cmd.Args, err)
}
oe.WriteOut([]byte(stdout.String()))
oe.WriteErr([]byte(stderr.String()))
exitCode := 0
if exitErr, ok := err.(*exec.ExitError); ok && exitErr != nil {
exitCode = exitErr.ExitCode()
}
res := command.NewResultFromExitCode(exitCode)
if exitCode == 0 && err != nil {
res = command.NewLocalErrorResult(err)
}
ch <- res
}()
return nil
}

0 comments on commit 1157864

Please sign in to comment.