diff --git a/README.md b/README.md index 7ab2a46..dbaa6a9 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,37 @@ # orgalorg -Ultimate parallel cluster file synchronization tool. +Ultimate parallel cluster file synchronization tool and SSH commands +executioner. +![demo](https://raw.githubusercontent.com/reconquest/orgalorg/status-line/demo.gif) -# What +# Features -orgalorg provides easy way of synchronizing files acroess cluster. +* Zero-configuration. No config files. -orgalorg works through ssh & tar, so no unexpected protocol errors will arise. +* Running SSH commands or shell scripts on any number of hosts in parallel. All + output from nodes will be returned back, keeping stdout and stderr streams + mapping of original commands. -In default mode of operation (lately referred as sync mode) orgalorg will -perform following steps in order: +* Synchronizing files and directories across cluster with prior global cluster + locking. + After synchronization is done, arbitrary command can be evaluated. -1. Acquire global cluster lock (more detailed info above). -2. Create, upload and extract specified files in streaming mode to the - specified nodes into temporary run directory. -3. Start synchronization tool on each node, that should relocate files from - temporary run directory to the actual destination. +* Synchronizing files and directories with subsequent run of complex multi-step + scenario with steps synchronization across cluster. -So, orgalorg expected to work with third-party synchronization tool, that -will do actual files relocation and can be quite intricate, **but orgalorg can -work without that tool and perform simple files sync (more on this later)**. +* User-friendly progress indication. +* Both strict or loose modes of failover to be sure that everything will either + fail on any error or try to complete, no matter of what. + +* Interactive password authentication as well as SSH public key authentication. + +* Ability to run commands through `sudo`. + +* Grouped mode of output, so stdout and stderr from nodes will be grouped by + node name. Alternatively, output can be returned as soon as node returns + something. # Example usages @@ -30,19 +40,19 @@ host-specification arguments, like `-o host-a -o host-b`. ## Obtaining global cluster lock -``` +```bash orgalorg -o ... -L ``` ## Obtaining global cluster lock on custom directory -``` +```bash orgalorg -o ... -L -r /etc ``` ## Evaluating command on hosts in parallel -``` +```bash orgalorg -o ... -C uptime ``` @@ -51,34 +61,66 @@ orgalorg -o ... -C uptime `axfr` is a tool of your choice for retrieving domain information from your infrastructure DNS. -``` +```bash axfr | grep phpnode | orgalorg -s -C uptime ``` ## Evaluate command under root (passwordless sudo required) -``` +```bash orgalorg -o ... -x -C whoami ``` ## Copying SSH public key for remote authentication -``` +```bash orgalorg -o ... -p -i ~/.ssh/id_rsa.pub -C tee -a ~/.ssh/authorized_keys ``` ## Synchronizing configs and then reloading service (like nginx) -``` +```bash orgalorg -o ... -xn 'systemctl reload nginx' -S /etc/nginx.conf ``` ## Evaluating shell script -``` +```bash orgalorg -o ... -i script.bash -C bash ``` +## Install package on all nodes and get combined output from each node + +```bash +orgalorg -o ... -lx -C pacman -Sy my-package --noconfirm +``` + +## Evaluating shell oneliner + +```bash +orgalorg -o ... -C sleep '$(($RANDOM % 10))' '&&' echo done +``` + +# Description + +orgalorg provides easy way of synchronizing files across cluster and running +arbitrary SSH commands. + +orgalorg works through SSH & tar, so no unexpected protocol errors will arise. + +In default mode of operation (lately referred as sync mode) orgalorg will +perform steps in the following order: + +1. Acquire global cluster lock (check more detailed info above). +2. Create, upload and extract specified files in streaming mode to the + specified nodes into temporary run directory. +3. Start synchronization tool on each node, that should relocate files from + temporary run directory to the destination. + +So, orgalorg expected to work with third-party synchronization tool, that +will do actual files relocation and can be quite intricate, **but orgalorg can +work without that tool and perform simple files sync (more on this later)**. + ## Global Cluster Lock @@ -251,3 +293,33 @@ continue to the next step of execution process. <- ORGALORG:132464327653 SYNC [user@node2:1234] phase 1 completed ``` +# Testing + +To run tests it's enough to: + +``` +./run_tests +``` + +## Requirements + +Testcases are run through [tests.sh](https://github.com/reconquest/tests.sh) +library. + +For every testcase new set of temporary containers will be initialized through +[hastur](https://github.com/seletskiy/hastur), so `systemd` is required for +running test suite. + +orgalorg testcases are close to reality as possible, so orgalorg will really +connect via SSH to cluster of containers in each testcase. + +## Coverage + +Run following command to calculate total coverage (available after running +testsuite): + +```bash +make coverage.total +``` + +Current coverage level is something about **85%**. diff --git a/archive.go b/archive.go index 50c614d..b0665fa 100644 --- a/archive.go +++ b/archive.go @@ -18,6 +18,7 @@ func startArchiveReceivers( cluster *distributedLock, rootDir string, sudo bool, + serial bool, ) (*remoteExecution, error) { var ( command = []string{} @@ -39,7 +40,7 @@ func startArchiveReceivers( logMutex := &sync.Mutex{} - runner := &remoteExecutionRunner{command: command} + runner := &remoteExecutionRunner{command: command, serial: serial} execution, err := runner.run( cluster, @@ -70,7 +71,7 @@ func startArchiveReceivers( func archiveFilesToWriter( target io.WriteCloser, - files []string, + files []file, preserveUID, preserveGID bool, ) error { workDir, err := os.Getwd() @@ -81,17 +82,55 @@ func archiveFilesToWriter( ) } + status := &struct { + Phase string + Total int + Fails int + Success int + Written bytesStringer + Bytes bytesStringer + }{ + Phase: "upload", + Total: len(files), + } + + setStatus(status) + + for _, file := range files { + status.Bytes.Amount += file.size + } + archive := tar.NewWriter(target) - for fileIndex, fileName := range files { + stream := io.MultiWriter(archive, callbackWriter( + func(data []byte) (int, error) { + status.Written.Amount += len(data) + + err = bar.Render(os.Stderr) + if err != nil { + errorf( + `%s`, + hierr.Errorf( + err, + `can't render status bar`, + ), + ) + } + + return len(data), nil + }, + )) + + for fileIndex, file := range files { infof( "%5d/%d sending file: '%s'", fileIndex+1, len(files), - fileName, + file.path, ) err = writeFileToArchive( - fileName, + file.path, + stream, archive, workDir, preserveUID, @@ -101,9 +140,11 @@ func archiveFilesToWriter( return hierr.Errorf( err, `can't write file to archive: '%s'`, - fileName, + file.path, ) } + + status.Success++ } tracef("closing archive stream, %d files sent", len(files)) @@ -129,6 +170,7 @@ func archiveFilesToWriter( func writeFileToArchive( fileName string, + stream io.Writer, archive *tar.Writer, workDir string, preserveUID, preserveGID bool, @@ -207,7 +249,7 @@ func writeFileToArchive( ) } - _, err = io.Copy(archive, fileToArchive) + _, err = io.Copy(stream, fileToArchive) if err != nil { return hierr.Errorf( err, @@ -219,8 +261,8 @@ func writeFileToArchive( return nil } -func getFilesList(relative bool, sources ...string) ([]string, error) { - files := []string{} +func getFilesList(relative bool, sources ...string) ([]file, error) { + files := []file{} for _, source := range sources { err := filepath.Walk( @@ -245,7 +287,10 @@ func getFilesList(relative bool, sources ...string) ([]string, error) { } } - files = append(files, path) + files = append(files, file{ + path: path, + size: int(info.Size()), + }) return nil }, diff --git a/bytes_stringer.go b/bytes_stringer.go new file mode 100644 index 0000000..59188c6 --- /dev/null +++ b/bytes_stringer.go @@ -0,0 +1,33 @@ +package main + +import ( + "fmt" +) + +type bytesStringer struct { + Amount int +} + +func (stringer bytesStringer) String() string { + amount := float64(stringer.Amount) + + suffixes := map[string]string{ + "b": "KiB", + "KiB": "MiB", + "MiB": "GiB", + "GiB": "TiB", + } + + suffix := "b" + for amount >= 1024 { + if newSuffix, ok := suffixes[suffix]; ok { + suffix = newSuffix + } else { + break + } + + amount /= 1024 + } + + return fmt.Sprintf("%.2f%s", amount, suffix) +} diff --git a/callback_writer.go b/callback_writer.go new file mode 100644 index 0000000..08980e6 --- /dev/null +++ b/callback_writer.go @@ -0,0 +1,9 @@ +package main + +type ( + callbackWriter func([]byte) (int, error) +) + +func (writer callbackWriter) Write(data []byte) (int, error) { + return writer(data) +} diff --git a/command.go b/command.go index e702df7..aff38d8 100644 --- a/command.go +++ b/command.go @@ -32,16 +32,22 @@ func runRemoteExecution( lockedNodes *distributedLock, command string, setupCallback func(*remoteExecutionNode), + serial bool, ) (*remoteExecution, error) { var ( stdins = []io.WriteCloser{} logLock = &sync.Mutex{} stdinsLock = &sync.Mutex{} + outputLock = &sync.Mutex{} nodes = &remoteNodes{&sync.Mutex{}, remoteNodesMap{}} ) + if !serial { + outputLock = nil + } + errors := make(chan error, 0) for _, node := range lockedNodes.nodes { go func(node *distributedLockNode) { @@ -59,6 +65,7 @@ func runRemoteExecution( node, command, logLock, + outputLock, ) if err != nil { errors <- err @@ -115,7 +122,8 @@ func runRemoteExecution( func runRemoteExecutionNode( node *distributedLockNode, command string, - logLock *sync.Mutex, + logLock sync.Locker, + outputLock sync.Locker, ) (*remoteExecutionNode, error) { remoteCommand, err := node.runner.Command(command) if err != nil { @@ -174,7 +182,7 @@ func runRemoteExecutionNode( stdout = lineflushwriter.New( prefixwriter.New( newDebugWriter(logger), - node.String()+" {cmd} ", + "{cmd} "+node.String()+" ", ), logLock, false, @@ -183,18 +191,28 @@ func runRemoteExecutionNode( stderr = lineflushwriter.New( prefixwriter.New( newDebugWriter(logger), - node.String()+" {cmd} ", + "{cmd} "+node.String()+" ", ), logLock, false, ) } + stdout = &statusBarUpdateWriter{stdout} + stderr = &statusBarUpdateWriter{stderr} + + if outputLock != (*sync.Mutex)(nil) { + sharedLock := newSharedLock(outputLock, 2) + + stdout = newLockedWriter(stdout, sharedLock) + stderr = newLockedWriter(stderr, sharedLock) + } + stdin, err := remoteCommand.StdinPipe() if err != nil { return nil, hierr.Errorf( err, - `can't get stdin from archive receiver command`, + `can't get stdin from remote command`, ) } diff --git a/debug.go b/debug.go deleted file mode 100644 index e3af256..0000000 --- a/debug.go +++ /dev/null @@ -1,72 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/seletskiy/hierr" -) - -func tracef(format string, args ...interface{}) { - if verbose < verbosityTrace { - return - } - - args = serializeErrors(args) - - logger.Debugf(format, args...) -} - -func debugf(format string, args ...interface{}) { - args = serializeErrors(args) - - logger.Debugf(format, args...) -} - -func infof(format string, args ...interface{}) { - args = serializeErrors(args) - - logger.Infof(format, args...) -} - -func warningf(format string, args ...interface{}) { - args = serializeErrors(args) - - if verbose <= verbosityQuiet { - return - } - - logger.Warningf(format, args...) -} - -func errorf(format string, args ...interface{}) { - args = serializeErrors(args) - - logger.Errorf(format, args...) -} - -func serializeErrors(args []interface{}) []interface{} { - for i, arg := range args { - if err, ok := arg.(error); ok { - args[i] = serializeError(err) - } - } - - return args -} - -func serializeError(err error) string { - if format == outputFormatText { - return fmt.Sprint(err) - } - - if hierarchicalError, ok := err.(hierr.Error); ok { - serializedError := fmt.Sprint(hierarchicalError.Nested) - if nested, ok := hierarchicalError.Nested.(error); ok { - serializedError = serializeError(nested) - } - - return hierarchicalError.Message + ": " + serializedError - } - - return err.Error() -} diff --git a/demo.gif b/demo.gif new file mode 100644 index 0000000..9a68197 Binary files /dev/null and b/demo.gif differ diff --git a/distributed_lock_node.go b/distributed_lock_node.go index c76a862..8b84b66 100644 --- a/distributed_lock_node.go +++ b/distributed_lock_node.go @@ -38,10 +38,11 @@ func (node *distributedLockNode) lock( filename string, ) error { lockCommandString := fmt.Sprintf( - `sh -c $'`+ - `flock -nx %s -c \'`+ - `printf "%s\\n" && cat\' || `+ - `printf "%s\\n"'`, + `sh -c "`+ + `flock -nx %s -c '`+ + `printf \"%s\\n\" && cat' || `+ + `printf \"%s\\n\"`+ + `"`, filename, lockAcquiredString, lockLockedString, diff --git a/file.go b/file.go new file mode 100644 index 0000000..0afca78 --- /dev/null +++ b/file.go @@ -0,0 +1,6 @@ +package main + +type file struct { + path string + size int +} diff --git a/format.go b/format.go index f8b5917..de6107d 100644 --- a/format.go +++ b/format.go @@ -1,8 +1,7 @@ package main import ( - "encoding/json" - "io" + "github.com/reconquest/loreley" ) type ( @@ -14,47 +13,29 @@ const ( outputFormatJSON ) -func parseOutputFormat(args map[string]interface{}) outputFormat { +func parseOutputFormat( + args map[string]interface{}, +) outputFormat { + + formatType := outputFormatText if args["--json"].(bool) { - return outputFormatJSON + formatType = outputFormatJSON } - return outputFormatText -} - -type jsonOutputWriter struct { - stream string - node string - - output io.Writer + return formatType } -func (writer *jsonOutputWriter) Write(data []byte) (int, error) { - if len(data) == 0 { - return 0, nil - } +func parseColorMode(args map[string]interface{}) loreley.ColorizeMode { + switch args["--color"].(string) { + case "always": + return loreley.ColorizeAlways - message := map[string]interface{}{ - "stream": writer.stream, - } - - if writer.node == "" { - message["node"] = nil - } else { - message["node"] = writer.node - } - - message["body"] = string(data) - - jsonMessage, err := json.Marshal(message) - if err != nil { - return 0, err - } + case "auto": + return loreley.ColorizeOnTTY - _, err = writer.output.Write(append(jsonMessage, '\n')) - if err != nil { - return 0, err + case "never": + return loreley.ColorizeNever } - return len(data), nil + return loreley.ColorizeNever } diff --git a/json_output_writer.go b/json_output_writer.go new file mode 100644 index 0000000..4520422 --- /dev/null +++ b/json_output_writer.go @@ -0,0 +1,43 @@ +package main + +import ( + "encoding/json" + "io" +) + +type jsonOutputWriter struct { + stream string + node string + + output io.Writer +} + +func (writer *jsonOutputWriter) Write(data []byte) (int, error) { + if len(data) == 0 { + return 0, nil + } + + message := map[string]interface{}{ + "stream": writer.stream, + } + + if writer.node == "" { + message["node"] = nil + } else { + message["node"] = writer.node + } + + message["body"] = string(data) + + jsonMessage, err := json.Marshal(message) + if err != nil { + return 0, err + } + + _, err = writer.output.Write(append(jsonMessage, '\n')) + if err != nil { + return 0, err + } + + return len(data), nil +} diff --git a/lock.go b/lock.go index 2cada6f..f427ef2 100644 --- a/lock.go +++ b/lock.go @@ -28,14 +28,23 @@ func acquireDistributedLock( var ( cluster = &distributedLock{} - connections = int64(0) - failures = int64(0) - errors = make(chan error, 0) nodeAddMutex = &sync.Mutex{} ) + status := &struct { + Phase string + Total int64 + Fails int64 + Success int64 + }{ + Phase: `lock`, + Total: int64(len(addresses)), + } + + setStatus(status) + for _, nodeAddress := range addresses { go func(nodeAddress address) { pool.run(func() { @@ -43,7 +52,8 @@ func acquireDistributedLock( node, err := connectToNode(cluster, runnerFactory, nodeAddress) if err != nil { - atomic.AddInt64(&failures, 1) + atomic.AddInt64(&status.Fails, 1) + atomic.AddInt64(&status.Total, -1) if noConnFail { failed = true @@ -56,7 +66,6 @@ func acquireDistributedLock( err = node.lock(lockFile) if err != nil { if noLockFail { - failed = true warningf("%s", err) } else { errors <- err @@ -65,11 +74,11 @@ func acquireDistributedLock( } } - status := "established" + textStatus := "established" if failed { - status = "failed" + textStatus = "failed" } else { - atomic.AddInt64(&connections, 1) + atomic.AddInt64(&status.Success, 1) nodeAddMutex.Lock() defer nodeAddMutex.Unlock() @@ -79,10 +88,10 @@ func acquireDistributedLock( debugf( `%4d/%d (%d failed) connection %s: %s`, - connections, - int64(len(addresses))-failures, - failures, - status, + status.Success, + status.Total, + status.Fails, + textStatus, nodeAddress, ) @@ -96,7 +105,7 @@ func acquireDistributedLock( for range addresses { err := <-errors if err != nil { - erronous += 1 + erronous++ topError = hierr.Push(topError, err) } @@ -127,6 +136,9 @@ func connectToNode( go func() { select { + case <-done: + return + case <-time.After(longConnectionWarningTimeout): warningf( "still connecting to address after %s: %s", @@ -134,10 +146,12 @@ func connectToNode( address, ) - case <-done: + <-done } + }() - return + defer func() { + done <- struct{}{} }() runner, err := runnerFactory(address) @@ -149,8 +163,6 @@ func connectToNode( ) } - done <- struct{}{} - return &distributedLockNode{ address: address, runner: runner, diff --git a/locked_writer.go b/locked_writer.go new file mode 100644 index 0000000..b4ae5e3 --- /dev/null +++ b/locked_writer.go @@ -0,0 +1,87 @@ +package main + +import ( + "io" + "sync" +) + +type sharedLock struct { + sync.Locker + + held *struct { + sync.Locker + + clients int + locked bool + } +} + +func newSharedLock(lock sync.Locker, clients int) *sharedLock { + return &sharedLock{ + Locker: lock, + + held: &struct { + sync.Locker + + clients int + locked bool + }{ + Locker: &sync.Mutex{}, + + clients: clients, + locked: false, + }, + } +} + +func (mutex *sharedLock) Lock() { + mutex.held.Lock() + defer mutex.held.Unlock() + + if !mutex.held.locked { + mutex.Locker.Lock() + + mutex.held.locked = true + } +} + +func (mutex *sharedLock) Unlock() { + mutex.held.Lock() + defer mutex.held.Unlock() + + mutex.held.clients-- + + if mutex.held.clients == 0 && mutex.held.locked { + mutex.held.locked = false + + mutex.Locker.Unlock() + } +} + +type lockedWriter struct { + writer io.WriteCloser + + lock sync.Locker +} + +func newLockedWriter( + writer io.WriteCloser, + lock sync.Locker, +) *lockedWriter { + return &lockedWriter{ + writer: writer, + lock: lock, + } +} + +func (writer *lockedWriter) Write(data []byte) (int, error) { + writer.lock.Lock() + + return writer.writer.Write(data) +} + +func (writer *lockedWriter) Close() error { + writer.lock.Unlock() + + return writer.writer.Close() +} diff --git a/log.go b/log.go new file mode 100644 index 0000000..c5413a1 --- /dev/null +++ b/log.go @@ -0,0 +1,217 @@ +package main + +import ( + "bytes" + "fmt" + "os" + "strings" + + "github.com/kovetskiy/lorg" + "github.com/reconquest/loreley" + "github.com/seletskiy/hierr" +) + +var ( + loggerFormattingBasicLength = 0 +) + +func setLoggerOutputFormat(logger *lorg.Log, format outputFormat) { + if format == outputFormatJSON { + logger.SetOutput(&jsonOutputWriter{ + stream: `stderr`, + node: ``, + output: os.Stderr, + }) + } +} + +func setLoggerVerbosity(level verbosity, logger *lorg.Log) { + logger.SetLevel(lorg.LevelWarning) + + switch { + case level >= verbosityTrace: + logger.SetLevel(lorg.LevelTrace) + + case level >= verbosityDebug: + logger.SetLevel(lorg.LevelDebug) + + case level >= verbosityNormal: + logger.SetLevel(lorg.LevelInfo) + } +} + +func setLoggerStyle(logger *lorg.Log, style lorg.Formatter) { + logger.SetFormat(style) + + buffer := &bytes.Buffer{} + logger.SetOutput(buffer) + + logger.Debug(``) + + loggerFormattingBasicLength = len(strings.TrimSuffix( + loreley.TrimStyles(buffer.String()), + "\n", + )) + + logger.SetOutput(os.Stderr) +} + +func tracef(format string, args ...interface{}) { + args = serializeErrors(args) + + logger.Tracef(`%s`, wrapNewLines(format, args...)) + + drawStatus() +} + +func debugf(format string, args ...interface{}) { + args = serializeErrors(args) + + logger.Debugf(`%s`, wrapNewLines(format, args...)) + + drawStatus() +} + +func infof(format string, args ...interface{}) { + args = serializeErrors(args) + + logger.Infof(`%s`, wrapNewLines(format, args...)) + + drawStatus() +} + +func warningf(format string, args ...interface{}) { + args = serializeErrors(args) + + if verbose <= verbosityQuiet { + return + } + + logger.Warningf(`%s`, wrapNewLines(format, args...)) + + drawStatus() +} + +func errorf(format string, args ...interface{}) { + args = serializeErrors(args) + + logger.Errorf(`%s`, wrapNewLines(format, args...)) +} + +func fatalf(format string, args ...interface{}) { + args = serializeErrors(args) + + logger.Fatalf(`%s`, wrapNewLines(format, args...)) + + exit(1) +} + +func wrapNewLines(format string, values ...interface{}) string { + contents := fmt.Sprintf(format, values...) + contents = strings.TrimSuffix(contents, "\n") + contents = strings.Replace( + contents, + "\n", + "\n"+strings.Repeat(" ", loggerFormattingBasicLength), + -1, + ) + + return contents +} + +func serializeErrors(args []interface{}) []interface{} { + for i, arg := range args { + if err, ok := arg.(error); ok { + args[i] = serializeError(err) + } + } + + return args +} + +func setStatus(status interface{}) { + if bar == nil { + return + } + + bar.SetStatus(status) +} + +func shouldDrawStatus() bool { + if bar == nil { + return false + } + + if format != outputFormatText { + return false + } + + if verbose <= verbosityQuiet { + return false + } + + return true +} + +func drawStatus() { + if !shouldDrawStatus() { + return + } + + err := bar.Render(os.Stderr) + if err != nil { + errorf( + "%s", hierr.Errorf( + err, + `can't draw status bar`, + ), + ) + } +} + +func clearStatus() { + if !shouldDrawStatus() { + return + } + + bar.Clear(os.Stderr) +} + +func serializeError(err error) string { + if format == outputFormatText { + return fmt.Sprint(err) + } + + if hierarchicalError, ok := err.(hierr.Error); ok { + serializedError := fmt.Sprint(hierarchicalError.Nested) + switch nested := hierarchicalError.Nested.(type) { + case error: + serializedError = serializeError(nested) + + case []hierr.NestedError: + serializeErrorParts := []string{} + + for _, nestedPart := range nested { + serializedPart := fmt.Sprint(nestedPart) + switch part := nestedPart.(type) { + case error: + serializedPart = serializeError(part) + + case string: + serializedPart = part + } + + serializeErrorParts = append( + serializeErrorParts, + serializedPart, + ) + } + + serializedError = strings.Join(serializeErrorParts, "; ") + } + + return hierarchicalError.Message + ": " + serializedError + } + + return err.Error() +} diff --git a/main.go b/main.go index a775c54..bf5a33d 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,8 @@ import ( "github.com/docopt/docopt-go" "github.com/kovetskiy/lorg" "github.com/mattn/go-shellwords" + "github.com/reconquest/barely" + "github.com/reconquest/loreley" "github.com/seletskiy/hierr" "github.com/theairkit/runcmd" ) @@ -107,7 +109,7 @@ Options: [default: $HOME/.ssh/id_rsa] -p --password Enable password authentication. Exclude '-k' option. - TTY is required for reading password. + Interactive TTY is required for reading password. -x --sudo Obtain root via 'sudo -n'. By default, orgalorg will not obtain root and do all actions from specified user. To change that @@ -122,6 +124,10 @@ Options: -u --user Username used for connecting to all hosts by default. [default: $USER] -i --stdin Pass specified file as input for the command. + -l --serial Run commands in serial mode, so they output will not + interleave each other. Only one node is allowed to + output, all other nodes will wait that node to + finish. -q --quiet Be quiet, in command mode do not use prefixes. -v --verbose Print debug information on stderr. -V --version Print program version. @@ -153,17 +159,39 @@ Advanced options: shell wrapper will be used. If any args are given using '-g', they will be appended to shell invocation. - [default: bash -c $'{}'] + [default: bash -c '{}'] -d --threads Set threads count which will be used for connection, locking and execution commands. [default: 16]. + --no-preserve-uid Do not preserve UIDs for transferred files. + --no-preserve-gid Do not preserve GIDs for transferred files. + +Output format and colors options: --json Output everything in line-by-line JSON format, printing objects with fields: * 'stream' = 'stdout' | 'stderr'; * 'node' = | null (if internal output); * 'body' = - --no-preserve-uid Do not preserve UIDs for transferred files. - --no-preserve-gid Do not preserve GIDs for transferred files. + --bar-format Format for the status bar. + Full Go template syntax is available with delims + of '{' and '}'. + See https://github.com/reconquest/barely for more + info. + For example, run orgalorg with '-vv' flag. + Two embedded themes are available by their names: + ` + themeDark + ` and ` + themeLight + ` + [default: ` + themeDark + `] + --log-format Format for the logs. + See https://github.com/reconquest/colorgful for more + info. + [default: ` + themeDark + `] + --dark Set all available formats to predefined dark theme. + --light Set all available formats to predefined light theme. + --color Specify, whether to use colors: + * never - disable colors; + * auto - use colors only when TTY presents. + * always - always use colorized output. + [default: auto] Timeout options: --conn-timeout Remote host connection timeout in milliseconds. @@ -180,8 +208,7 @@ Timeout options: ` const ( - defaultSSHPort = 22 - sshPasswordPrompt = "Password: " + defaultSSHPort = 22 // heartbeatTimeoutCoefficient will be multiplied to send timeout and // resulting value will be used as time interval between heartbeats. @@ -192,12 +219,17 @@ const ( defaultLockFile = "/" ) +var ( + sshPasswordPrompt = "Password: " +) + var ( logger = lorg.NewLog() verbose = verbosityNormal format = outputFormatText pool *threadPool + bar *barely.StatusBar ) var ( @@ -205,22 +237,7 @@ var ( ) func main() { - logger.SetFormat(lorg.NewFormat("* ${time} ${level:[%s]:right} %s")) - - usage, err := formatUsage(usage) - if err != nil { - logger.Error(hierr.Errorf( - err, - `can't format usage`, - )) - - exit(1) - } - - args, err := docopt.Parse(usage, nil, true, version, true) - if err != nil { - panic(err) - } + args := parseArgs() verbose = parseVerbosity(args) @@ -228,7 +245,19 @@ func main() { format = parseOutputFormat(args) - setLoggerOutputFormat(format, logger) + setLoggerOutputFormat(logger, format) + + loreley.Colorize = parseColorMode(args) + + loggerStyle, err := getLoggerTheme(parseTheme("log", args)) + if err != nil { + fatalf("%s", hierr.Errorf( + err, + `can't use given logger style`, + )) + } + + setLoggerStyle(logger, loggerStyle) poolSize, err := parseThreadPoolSize(args) if err != nil { @@ -240,6 +269,21 @@ func main() { pool = newThreadPool(poolSize) + barStyle, err := getStatusBarTheme(parseTheme("bar", args)) + if err != nil { + errorf("%s", hierr.Errorf( + err, + `can't use given status bar style`, + )) + } + + if loreley.HasTTY(int(os.Stderr.Fd())) { + bar = barely.NewStatusBar(barStyle.Template) + } else { + bar = nil + sshPasswordPrompt = "" + } + switch { case args["--upload"].(bool): fallthrough @@ -252,31 +296,25 @@ func main() { } if err != nil { - errorf("%s", err) - - exit(1) + fatalf("%s", err) } } -func setLoggerOutputFormat(format outputFormat, logger *lorg.Log) { - if format == outputFormatJSON { - logger.SetOutput(&jsonOutputWriter{ - stream: `stderr`, - node: ``, - output: os.Stderr, - }) +func parseArgs() map[string]interface{} { + usage, err := formatUsage(string(usage)) + if err != nil { + fatalf("%s", hierr.Errorf( + err, + `can't format usage`, + )) } -} - -func setLoggerVerbosity(level verbosity, logger *lorg.Log) { - logger.SetLevel(lorg.LevelWarning) - switch { - case level >= verbosityDebug: - logger.SetLevel(lorg.LevelDebug) - case level >= verbosityNormal: - logger.SetLevel(lorg.LevelInfo) + args, err := docopt.Parse(usage, nil, true, version, true) + if err != nil { + panic(err) } + + return args } func formatUsage(template string) (string, error) { @@ -304,6 +342,7 @@ func handleEvaluate(args map[string]interface{}) error { rootDir, _ = args["--root"].(string) sudo = args["--sudo"].(bool) shell = args["--shell"].(string) + serial = args["--serial"].(bool) command = args[""].([]string) ) @@ -320,6 +359,7 @@ func handleEvaluate(args map[string]interface{}) error { sudo: sudo, command: command, directory: rootDir, + serial: serial, } return run(cluster, runner, stdin) @@ -374,7 +414,9 @@ func run( if err != nil { return hierr.Errorf( err, - `remote execution failed`, + `remote execution failed, because one of `+ + `command has been exited with non-zero exit `+ + `code at least on one node`, ) } @@ -396,13 +438,14 @@ func handleSynchronize(args map[string]interface{}) error { shell = args["--shell"].(string) - sudo = args["--sudo"].(bool) + sudo = args["--sudo"].(bool) + serial = args["--serial"].(bool) fileSources = args[""].([]string) ) var ( - filesList = []string{} + filesList = []file{} err error ) @@ -469,6 +512,7 @@ func handleSynchronize(args map[string]interface{}) error { command: command, args: commandArgs, directory: rootDir, + serial: serial, } if isSimpleCommand { @@ -489,7 +533,7 @@ func handleSynchronize(args map[string]interface{}) error { func upload( args map[string]interface{}, cluster *distributedLock, - filesList []string, + filesList []file, ) error { var ( rootDir, _ = args["--root"].(string) @@ -497,6 +541,8 @@ func upload( preserveUID = !args["--no-preserve-uid"].(bool) preserveGID = !args["--no-preserve-gid"].(bool) + + serial = args["--serial"].(bool) ) if rootDir == "" { @@ -505,7 +551,7 @@ func upload( debugf(`file upload started into: '%s'`, rootDir) - receivers, err := startArchiveReceivers(cluster, rootDir, sudo) + receivers, err := startArchiveReceivers(cluster, rootDir, sudo, serial) if err != nil { return hierr.Errorf( err, @@ -753,7 +799,9 @@ func readPassword(prompt string) (string, error) { ) } - fmt.Fprintln(os.Stderr) + if sshPasswordPrompt != "" { + fmt.Fprintln(os.Stderr) + } return string(password), nil } diff --git a/remote_execution.go b/remote_execution.go index 23c13f8..b6a8333 100644 --- a/remote_execution.go +++ b/remote_execution.go @@ -3,6 +3,7 @@ package main import ( "fmt" "io" + "reflect" "github.com/seletskiy/hierr" ) @@ -19,27 +20,40 @@ type remoteExecutionResult struct { } func (execution *remoteExecution) wait() error { - tracef("waiting %d nodes to finish", len(execution.nodes)) + tracef(`waiting %d nodes to finish`, len(execution.nodes)) results := make(chan *remoteExecutionResult, 0) for _, node := range execution.nodes { go func(node *remoteExecutionNode) { - pool.run(func() { - results <- &remoteExecutionResult{node, node.wait()} - }) + results <- &remoteExecutionResult{node, node.wait()} }(node) } executionErrors := fmt.Errorf( - `can't run remote commands on %d nodes`, - len(execution.nodes), + `commands are exited with non-zero code`, ) - erroneous := false + var ( + status = &struct { + Phase string + Total int + Fails int + Success int + }{ + Phase: `exec`, + Total: len(execution.nodes), + } + + exitCodes = map[int]int{} + ) + + setStatus(status) for range execution.nodes { result := <-results if result.err != nil { + exitCodes[result.node.exitCode]++ + executionErrors = hierr.Push( executionErrors, hierr.Errorf( @@ -49,7 +63,14 @@ func (execution *remoteExecution) wait() error { ), ) - erroneous = true + status.Fails++ + status.Total-- + + tracef( + `%s finished with exit code: '%d'`, + result.node.node.String(), + result.node.exitCode, + ) continue } @@ -60,8 +81,35 @@ func (execution *remoteExecution) wait() error { ) } - if erroneous { - return executionErrors + if status.Fails > 0 { + if status.Fails == len(execution.nodes) { + exitCodesValue := reflect.ValueOf(exitCodes) + + topError := fmt.Errorf( + `commands are exited with non-zero exit code on all %d nodes`, + len(execution.nodes), + ) + + for _, key := range exitCodesValue.MapKeys() { + topError = hierr.Push( + topError, + fmt.Sprintf( + `code %d (%d nodes)`, + key.Int(), + exitCodesValue.MapIndex(key).Int(), + ), + ) + } + + return topError + } + + return hierr.Errorf( + executionErrors, + `commands are exited with non-zero exit code on %d of %d nodes`, + status.Fails, + len(execution.nodes), + ) } return nil diff --git a/remote_execution_node.go b/remote_execution_node.go index 306388c..d550499 100644 --- a/remote_execution_node.go +++ b/remote_execution_node.go @@ -17,6 +17,8 @@ type remoteExecutionNode struct { stdin io.WriteCloser stdout io.WriteCloser stderr io.WriteCloser + + exitCode int } func (node *remoteExecutionNode) wait() error { @@ -24,12 +26,14 @@ func (node *remoteExecutionNode) wait() error { if err != nil { _ = node.stdout.Close() _ = node.stderr.Close() - if sshErr, ok := err.(*ssh.ExitError); ok { + if sshErrors, ok := err.(*ssh.ExitError); ok { + node.exitCode = sshErrors.Waitmsg.ExitStatus() + return fmt.Errorf( `%s had failed to evaluate command, `+ `remote command exited with non-zero code: %d`, node.node.String(), - sshErr.Waitmsg.ExitStatus(), + node.exitCode, ) } diff --git a/remote_execution_runner.go b/remote_execution_runner.go index 6e56ec9..d4a9626 100644 --- a/remote_execution_runner.go +++ b/remote_execution_runner.go @@ -10,11 +10,12 @@ var ( ) type remoteExecutionRunner struct { - shell string - sudo bool command []string args []string + shell string directory string + sudo bool + serial bool } func (runner *remoteExecutionRunner) run( @@ -38,7 +39,7 @@ func (runner *remoteExecutionRunner) run( command = joinCommand(sudoCommand) + " " + command } - return runRemoteExecution(cluster, command, setupCallback) + return runRemoteExecution(cluster, command, setupCallback, runner.serial) } func wrapCommandIntoShell(command string, shell string, args []string) string { @@ -71,8 +72,7 @@ func joinCommand(command []string) string { } func escapeCommandArgument(argument string) string { - argument = strings.Replace(argument, `\`, `\\`, -1) - argument = strings.Replace(argument, ` `, `\ `, -1) + argument = strings.Replace(argument, `'`, `'\''`, -1) return argument } @@ -81,6 +81,7 @@ func escapeCommandArgumentStrict(argument string) string { argument = strings.Replace(argument, `\`, `\\`, -1) argument = strings.Replace(argument, "`", "\\`", -1) argument = strings.Replace(argument, `"`, `\"`, -1) + argument = strings.Replace(argument, `'`, `'\''`, -1) argument = strings.Replace(argument, `$`, `\$`, -1) return `"` + argument + `"` diff --git a/status_bar_update_writer.go b/status_bar_update_writer.go new file mode 100644 index 0000000..6513e4e --- /dev/null +++ b/status_bar_update_writer.go @@ -0,0 +1,23 @@ +package main + +import ( + "io" +) + +type statusBarUpdateWriter struct { + writer io.WriteCloser +} + +func (writer *statusBarUpdateWriter) Write(data []byte) (int, error) { + clearStatus() + + written, err := writer.writer.Write(data) + + drawStatus() + + return written, err +} + +func (writer *statusBarUpdateWriter) Close() error { + return writer.writer.Close() +} diff --git a/tests/testcases/can-skip-unreachable-servers-if-flag-is-given.test.sh b/tests/testcases/can-skip-unreachable-servers-if-flag-is-given.test.sh index 1bc1f83..cd60e3d 100644 --- a/tests/testcases/can-skip-unreachable-servers-if-flag-is-given.test.sh +++ b/tests/testcases/can-skip-unreachable-servers-if-flag-is-given.test.sh @@ -2,7 +2,7 @@ tests:not tests:ensure :orgalorg:with-key -o example.com -C whoami tests:ensure :orgalorg:with-key -o example.com -w -C whoami -tests:assert-stderr-re "WARNING.*can't connect to address.*example.com" +tests:assert-stderr-re "WARN.*can't connect to address.*example.com" :check-node-output() { local container_ip="$2" diff --git a/tests/testcases/commandline/can-handle-common-mistakes-in-arguments.test.sh b/tests/testcases/commandline/can-handle-common-mistakes-in-arguments.test.sh index 0783fbc..b126d85 100644 --- a/tests/testcases/commandline/can-handle-common-mistakes-in-arguments.test.sh +++ b/tests/testcases/commandline/can-handle-common-mistakes-in-arguments.test.sh @@ -5,10 +5,6 @@ tests:not tests:ensure :orgalorg -o ./blah -C echo blah tests:assert-stderr-re "can't open.*blah" tests:assert-stderr-re "blah.*no such file or directory" -tests:not tests:ensure :orgalorg -p -s -C echo blah -tests:assert-stderr-re "incompatible options" -tests:assert-stderr-re "password authentication is not possible.*stdin" - tests:not tests:ensure :orgalorg -o blah --send-timeout=wazup -C echo dunno tests:assert-stderr-re "send timeout to number" diff --git a/tests/testcases/commands/can-aggregate-errors-on-the-end-of-execution.test.sh b/tests/testcases/commands/can-aggregate-errors-on-the-end-of-execution.test.sh index a81310c..c5fc11c 100644 --- a/tests/testcases/commands/can-aggregate-errors-on-the-end-of-execution.test.sh +++ b/tests/testcases/commands/can-aggregate-errors-on-the-end-of-execution.test.sh @@ -1,10 +1,3 @@ tests:not tests:ensure :orgalorg:with-key -e -C echo 1 '&&' exit 1 - -:check-node-output() { - local container_ip="$2" - - tests:assert-stdout "$container_ip 1" - tests:assert-stderr-re "$container_ip.*non-zero code: 1" -} - -containers:do :check-node-output +tests:assert-stderr-re "exited with non-zero.*all $(containers:count) nodes" +tests:assert-stderr "code 1 ($(containers:count) nodes)" diff --git a/tests/testcases/commands/can-escape-space-in-the-remote-command.test.sh b/tests/testcases/commands/can-escape-space-in-the-remote-command.test.sh index 9b00d00..96637d0 100644 --- a/tests/testcases/commands/can-escape-space-in-the-remote-command.test.sh +++ b/tests/testcases/commands/can-escape-space-in-the-remote-command.test.sh @@ -1,3 +1,3 @@ -tests:ensure :orgalorg:with-key -e -C echo 'two spaces' +tests:ensure :orgalorg:with-key -e -C echo '"two spaces"' tests:assert-stdout "two spaces" diff --git a/tests/testcases/commands/should-properly-escape-shell-arguments.test.sh b/tests/testcases/commands/should-properly-escape-shell-arguments.test.sh new file mode 100644 index 0000000..03f7e13 --- /dev/null +++ b/tests/testcases/commands/should-properly-escape-shell-arguments.test.sh @@ -0,0 +1,8 @@ +tests:ensure echo "'1'" +tests:ensure :orgalorg:with-key -e -C echo "'1'" + +tests:assert-stdout-re "1$" + +tests:ensure :orgalorg:with-key -e -C echo "\\'" + +tests:assert-stdout-re "'$" diff --git a/tests/testcases/commands/should-return-non-zero-exit-code-if-command-fails.test.sh b/tests/testcases/commands/should-return-non-zero-exit-code-if-command-fails.test.sh index 23b4d69..e1c3c5b 100644 --- a/tests/testcases/commands/should-return-non-zero-exit-code-if-command-fails.test.sh +++ b/tests/testcases/commands/should-return-non-zero-exit-code-if-command-fails.test.sh @@ -1,4 +1,4 @@ tests:not tests:ensure :orgalorg:with-key -e -C exit 17 -tests:assert-stderr "failed to evaluate" -tests:assert-stderr "non-zero code: 17" +tests:assert-stderr "remote execution failed" +tests:assert-stderr "code 17 ($(containers:count) nodes)" diff --git a/tests/testcases/locking/will-continue-execution-when-lock-failed-if-flag-is-specified.test.sh b/tests/testcases/locking/will-continue-execution-when-lock-failed-if-flag-is-specified.test.sh index 004efb6..24d749c 100644 --- a/tests/testcases/locking/will-continue-execution-when-lock-failed-if-flag-is-specified.test.sh +++ b/tests/testcases/locking/will-continue-execution-when-lock-failed-if-flag-is-specified.test.sh @@ -3,7 +3,6 @@ tests:involve tests/testcases/locking/lock.sh :orgalorg:lock orgalorg_output orgalorg_pid tests:ensure :orgalorg:with-key --no-lock-fail -C -- echo 1 -tests:assert-stderr "proceeding with execution" tests:assert-stdout "1" pkill -INT -P "$orgalorg_pid" diff --git a/tests/testcases/verbosity/can-log-stderr-and-stdout-from-remote-at-verbose-level-to-stderr.test.sh b/tests/testcases/verbosity/can-log-stderr-and-stdout-from-remote-at-verbose-level-to-stderr.test.sh index a75f3c1..1dbaa95 100644 --- a/tests/testcases/verbosity/can-log-stderr-and-stdout-from-remote-at-verbose-level-to-stderr.test.sh +++ b/tests/testcases/verbosity/can-log-stderr-and-stdout-from-remote-at-verbose-level-to-stderr.test.sh @@ -1,5 +1,5 @@ # note! tests:ensure will eat '>&2' if it's passed without prefix tests:ensure :orgalorg:with-key -v -C -- echo 1 \; echo err\>\&2 -tests:assert-stderr " 1" -tests:assert-stderr " err" +tests:assert-stderr-re ' \[.*\] 1' +tests:assert-stderr-re ' \[.*\] err' diff --git a/tests/testcases/verbosity/can-output-verbose-debug-info-in-json-format.test.sh b/tests/testcases/verbosity/can-output-verbose-debug-info-in-json-format.test.sh index dec2926..c42d9be 100644 --- a/tests/testcases/verbosity/can-output-verbose-debug-info-in-json-format.test.sh +++ b/tests/testcases/verbosity/can-output-verbose-debug-info-in-json-format.test.sh @@ -2,4 +2,4 @@ tests:ensure :orgalorg:with-key --json -vv -C pwd tests:assert-stderr-re '"stream":"stderr"' tests:assert-stderr-re '"body":".*DEBUG.*connection established' -tests:assert-stderr-re '"body":".*DEBUG.*locking node' +tests:assert-stderr-re '"body":".*TRACE.*running lock command' diff --git a/themes.go b/themes.go new file mode 100644 index 0000000..36bb615 --- /dev/null +++ b/themes.go @@ -0,0 +1,94 @@ +package main + +import ( + "fmt" + + "github.com/kovetskiy/lorg" + "github.com/reconquest/colorgful" + "github.com/reconquest/loreley" +) + +const ( + themeDark = `dark` + themeLight = `light` +) + +var ( + statusBarThemeTemplate = `{bg %d}{fg %d}` + + `{bold}` + + `{if eq .Phase "lock"}{bg %d} LOCK{end}` + + `{if eq .Phase "exec"}{bg %d} EXEC{end}` + + `{if eq .Phase "upload"}{bg %d} UPLOAD{end}` + + `{nobold} ` + + `{from "" %d} ` + + `{fg %d}{bold}{printf "%%4d" .Success}{nobold}{fg %d}` + + `/{printf "%%4d" .Total} ` + + `{if .Fails}{fg %d}✗ {.Fails}{end} ` + + `{from "" %d}` + + `{if eq .Phase "upload"}{fg %d} ` + + `{printf "%%9s/%%s" .Written .Bytes} ` + + `{end}` + + statusBarThemes = map[string]string{ + themeDark: fmt.Sprintf( + statusBarThemeTemplate, + 99, 7, 22, 1, 25, 237, 46, 15, 214, 16, 140, + ), + + themeLight: fmt.Sprintf( + statusBarThemeTemplate, + 99, 7, 22, 1, 64, 254, 106, 16, 9, -1, 140, + ), + } + + logFormat = `${time} ${level:[%s]:right:true} %s` +) + +func getLoggerTheme(theme string) (lorg.Formatter, error) { + switch theme { + case "dark": + return colorgful.ApplyDefaultTheme( + logFormat, + colorgful.Dark, + ) + case "light": + return colorgful.ApplyDefaultTheme( + logFormat, + colorgful.Light, + ) + default: + return colorgful.Format(theme) + } +} + +func getStatusBarTheme(theme string) (*loreley.Style, error) { + if format, ok := statusBarThemes[theme]; ok { + theme = format + } + + style, err := loreley.CompileWithReset(theme, nil) + if err != nil { + return nil, err + } + + return style, nil +} + +func parseTheme(target string, args map[string]interface{}) string { + var ( + theme = args["--"+target+"-format"].(string) + light = args["--light"].(bool) + dark = args["--dark"].(bool) + ) + + switch { + case light: + return themeLight + + case dark: + return themeDark + + default: + return theme + } +}