Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JPERF-716 background process results #13

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ The API consists of all public Kotlin types from `com.atlassian.performance.tool
## [Unreleased]
[Unreleased]: https://github.com/atlassian/ssh/compare/release-2.3.1...master

### Added
- Add `Ssh.runInBackground`, which yields `SshResult`s unlike the old `SshConnection.startProcess`. Resolve [JPERF-716].
- Tolerate lack of interrupt if `BackgroundProcess` is already finished.

### Deprecated
- Deprecate `SshConnection.startProcess`, `stopProcess` and `DetachedProcess` in favor of new `BackgroundProcess` APIs.

[JPERF-716]: https://ecosystem.atlassian.net/browse/JPERF-716

## [2.3.1] - 2020-04-06
[2.3.1]: https://github.com/atlassian/ssh/compare/release-2.3.0...release-2.3.1

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.atlassian.performance.tools.ssh

import com.atlassian.performance.tools.ssh.api.BackgroundProcess
import com.atlassian.performance.tools.ssh.api.SshConnection
import net.schmizz.sshj.connection.channel.direct.Session
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean

internal class SshjBackgroundProcess(
private val session: Session,
private val command: Session.Command
) : BackgroundProcess {

private var closed = AtomicBoolean(false)

override fun stop(timeout: Duration): SshConnection.SshResult {
tryToInterrupt()
val result = WaitingCommand(command, timeout, Level.DEBUG, Level.DEBUG).waitForResult()
close()
return result
}

private fun tryToInterrupt() {
try {
sendSigint()
} catch (e: Exception) {
LOG.debug("cannot interrupt, if the command doesn't run anymore, then the write connection is closed", e)
}
}

/**
* [Session.Command.signal] doesn't work, so send the CTRL-C character rather than SSH-level SIGINT signal.
* [OpenSSH server was not supporting this standard](https://bugzilla.mindrot.org/show_bug.cgi?id=1424).
* It's supported since 7.9p1 (late 2018), but our test Ubuntu still runs on 7.6p1.
*/
private fun sendSigint() {
val ctrlC = 3
command.outputStream.write(ctrlC);
command.outputStream.flush();
}

override fun close() {
if (!closed.getAndSet(true)) {
command.use {}
session.use {}
}
}

private companion object {
private val LOG = LogManager.getLogger(this::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.io.File
import java.io.InputStream
import java.nio.file.Path
import java.time.Duration
import java.time.Instant
import java.util.concurrent.TimeUnit

/**
* An [SshConnection] based on the [SSHJ library](https://github.com/hierynomus/sshj).
Expand Down Expand Up @@ -63,19 +60,16 @@ internal class SshjConnection internal constructor(
): SshResult {
logger.debug("${sshHost.userName}$ $cmd")
return session.exec(cmd).use { command ->
command.waitForCompletion(cmd, timeout)
SshResult(
exitStatus = command.exitStatus,
output = command.inputStream.readAndLog(stdout),
errorOutput = command.errorStream.readAndLog(stderr)
)
WaitingCommand(command, timeout, stdout, stderr).waitForResult()
}
}

@Suppress("DEPRECATION", "OverridingDeprecatedMember") // used in public API, can only remove in a MAJOR release
override fun startProcess(cmd: String): DetachedProcess {
return ssh.startSession().use { DetachedProcess.start(cmd, it) }
}

@Suppress("DEPRECATION", "OverridingDeprecatedMember") // used in public API, can only remove in a MAJOR release
override fun stopProcess(process: DetachedProcess) {
ssh.startSession().use { process.stop(it) }
}
Expand All @@ -91,62 +85,9 @@ internal class SshjConnection internal constructor(
scpFileTransfer.upload(localSource.absolutePath, remoteDestination)
}

private fun Session.Command.waitForCompletion(
cmd: String,
timeout: Duration
) {
val expectedEnd = Instant.now().plus(timeout)
val extendedTime = timeout.multipliedBy(5).dividedBy(4)
try {
this.join(extendedTime.toMillis(), TimeUnit.MILLISECONDS)
} catch (e: Exception) {
val output = readOutput(cmd)
throw Exception("SSH command failed to finish in extended time ($extendedTime): $output", e)
}
val actualEnd = Instant.now()
if (actualEnd.isAfter(expectedEnd)) {
val overtime = Duration.between(expectedEnd, actualEnd)
throw Exception("SSH command exceeded timeout $timeout by $overtime: '$cmd'")
}
}

private fun Session.Command.readOutput(
cmd: String
): SshExecutedCommand {
return try {
this.close()
SshExecutedCommand(
cmd = cmd,
stdout = this.inputStream.reader().use { it.readText() },
stderr = this.errorStream.reader().use { it.readText() }
)
} catch (e: Exception) {
logger.error("Failed do close ssh channel. Can't get command output", e)
SshExecutedCommand(
cmd = cmd,
stdout = "<couldn't get command stdout>",
stderr = "<couldn't get command stderr>"
)
}
}

private fun InputStream.readAndLog(level: Level): String {
val output = this.reader().use { it.readText() }
if (output.isNotBlank()) {
logger.log(level, output)
}
return output
}

override fun getHost(): SshHost = sshHost

override fun close() {
ssh.close()
}

private data class SshExecutedCommand(
val cmd: String,
val stdout: String,
val stderr: String
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.atlassian.performance.tools.ssh

import com.atlassian.performance.tools.ssh.api.SshConnection
import net.schmizz.sshj.connection.channel.direct.Session
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.io.InputStream
import java.time.Duration
import java.time.Instant
import java.util.concurrent.TimeUnit

internal class WaitingCommand(
private val command: Session.Command,
private val timeout: Duration,
private val stdout: Level,
private val stderr: Level
) {

fun waitForResult(): SshConnection.SshResult {
command.waitForCompletion(timeout)
return SshConnection.SshResult(
exitStatus = command.exitStatus,
output = command.inputStream.readAndLog(stdout),
errorOutput = command.errorStream.readAndLog(stderr)
)
}

private fun Session.Command.waitForCompletion(
timeout: Duration
) {
val expectedEnd = Instant.now().plus(timeout)
val extendedTime = timeout.multipliedBy(5).dividedBy(4)
try {
this.join(extendedTime.toMillis(), TimeUnit.MILLISECONDS)
} catch (e: Exception) {
val output = readOutput()
throw Exception("SSH command failed to finish in extended time ($extendedTime): $output", e)
}
val actualEnd = Instant.now()
if (actualEnd.isAfter(expectedEnd)) {
val overtime = Duration.between(expectedEnd, actualEnd)
throw Exception("SSH command exceeded timeout $timeout by $overtime")
}
}

private fun Session.Command.readOutput(): SshjExecutedCommand {
return try {
this.close()
SshjExecutedCommand(
stdout = this.inputStream.reader().use { it.readText() },
stderr = this.errorStream.reader().use { it.readText() }
)
} catch (e: Exception) {
LOG.error("Failed do close ssh channel. Can't get command output", e)
SshjExecutedCommand(
stdout = "<couldn't get command stdout>",
stderr = "<couldn't get command stderr>"
)
}
}

private fun InputStream.readAndLog(level: Level): String {
val output = this.reader().use { it.readText() }
if (output.isNotBlank()) {
LOG.log(level, output)
}
return output
}

private data class SshjExecutedCommand(
val stdout: String,
val stderr: String
)

private companion object {
private val LOG: Logger = LogManager.getLogger(this::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.atlassian.performance.tools.ssh.api

import java.time.Duration

/**
* Runs in the background. Is independent of `SshConnection`s being closed.
* Can be used for commands, which will not stop on their own, e.g. `tail -f`, `ping`, `top`, etc.
* @since 2.4.0
*/
interface BackgroundProcess : AutoCloseable {

/**
* Interrupts the process, then waits up to [timeout] for its completion.
* Skips the interrupt if the process is already finished.
* Throws if getting the [SshConnection.SshResult] fails.
* Closes the open resources.
*
* @return the result of the stopped process, could have a non-zero exit code
*/
fun stop(timeout: Duration): SshConnection.SshResult
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit
*
* @see [SshConnection.stopProcess]
*/
@Deprecated(message = "Use BackgroundProcess instead")
class DetachedProcess private constructor(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do it in DetachedProcess, but none of the approaches seemed to satisfy all requirements. So I wrote the tests, which guard the requirements. Maybe someone will come in the future and will be able to make them work with DetachedProcess.

But for now, I had to resign from the requirement of starting/stopping on different Session.Commands.

private val cmd: String,
private val uuid: UUID
Expand All @@ -26,6 +27,7 @@ class DetachedProcess private constructor(
logger.debug("Starting process $uuid $cmd")
session.exec("screen -dm bash -c '${savePID(uuid)} && $cmd'")
.use { command -> command.join(15, TimeUnit.SECONDS) }
@Suppress("DEPRECATION") // used transitively by public API
return DetachedProcess(cmd, uuid)
}

Expand All @@ -37,4 +39,4 @@ class DetachedProcess private constructor(
session.exec("kill -3 `cat $dir/$uuid`")
.use { command -> command.join(15, TimeUnit.SECONDS) }
}
}
}
15 changes: 14 additions & 1 deletion src/main/kotlin/com/atlassian/performance/tools/ssh/api/Ssh.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.atlassian.performance.tools.ssh.api
import com.atlassian.performance.tools.jvmtasks.api.ExponentialBackoff
import com.atlassian.performance.tools.jvmtasks.api.IdempotentAction
import com.atlassian.performance.tools.ssh.PerformanceDefaultConfig
import com.atlassian.performance.tools.ssh.SshjBackgroundProcess
import com.atlassian.performance.tools.ssh.SshjConnection
import com.atlassian.performance.tools.ssh.port.LocalPort
import com.atlassian.performance.tools.ssh.port.RemotePort
Expand Down Expand Up @@ -44,6 +45,18 @@ data class Ssh(
)
}

/**
* Runs [cmd] in the background, without waiting for its completion. The returned process can be stopped later.
*
* @since 2.4.0
*/
fun runInBackground(cmd: String): BackgroundProcess {
val session = prepareClient().startSession()
session.allocateDefaultPTY()
val command = session.exec(cmd)
return SshjBackgroundProcess(session, command)
}

/**
* Creates an encrypted connection between a local machine and a remote machine through which you can relay traffic.
*
Expand Down Expand Up @@ -102,4 +115,4 @@ data class Ssh(
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ interface SshConnection : Closeable {
/**
* Starts a [DetachedProcess]. You can use [stopProcess] to stop it later.
*/
@Deprecated(message = "Use Ssh.runInBackground instead")
fun startProcess(
cmd: String
): DetachedProcess

/**
* Stops a [DetachedProcess].
*/
@Deprecated(message = "Use BackgroundProcess.stop instead")
fun stopProcess(
process: DetachedProcess
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ class SshConnectionTest {
Assert.assertEquals(sshResult.output, "test\n")
}
}
}
}
Loading