Skip to content

Commit

Permalink
Rewrite SharedResources
Browse files Browse the repository at this point in the history
I just couldn't get rid of the race condition when starting/stopping the `shareIn` Flow
  • Loading branch information
d4rken committed Jan 30, 2025
1 parent 5531460 commit 4bda0bd
Show file tree
Hide file tree
Showing 13 changed files with 398 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal fun FileHandle.remoteFileHandle(): RemoteFileHandle.Stub = object : Rem

override fun read(fileOffset: Long, array: ByteArray, arrayOffset: Int, byteCount: Int): Int = try {
this@remoteFileHandle.read(fileOffset, array, arrayOffset, byteCount).also {
if (Bugs.isTraceDeepDive) {
if (Bugs.isDive) {
log(VERBOSE) { "read(rootside-p): $fileOffset, ${array.size}, $arrayOffset, $byteCount, read $it into ${array.toHex()}" }
}
}
Expand Down Expand Up @@ -73,7 +73,7 @@ internal fun RemoteFileHandle.fileHandle(readWrite: Boolean): FileHandle = objec
@Throws(IOException::class)
override fun protectedRead(fileOffset: Long, array: ByteArray, arrayOffset: Int, byteCount: Int): Int = try {
this@fileHandle.read(fileOffset, array, arrayOffset, byteCount).also {
if (Bugs.isTraceDeepDive) {
if (Bugs.isDive) {
log(VERBOSE) { "read(appside-p): $fileOffset, ${array.size}, $arrayOffset, $byteCount, read $it into ${array.toHex()}" }
}
if (it == -2) throw IOException("Remote Exception")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class RootServiceClient @Inject constructor(
private val pkgOpsClientFactory: PkgOpsClient.Factory,
private val shellOpsClientFactory: ShellOpsClient.Factory,
) : SharedResource<RootServiceClient.Connection>(
TAG,
coroutineScope,
callbackFlow {
tag = TAG,
parentScope = coroutineScope,
source = callbackFlow {
log(TAG) { "Instantiating Root launcher..." }
if (rootSettings.useRoot.value() != true) throw RootUnavailableException("Root is not enabled")

Expand Down Expand Up @@ -99,7 +99,6 @@ class RootServiceClient @Inject constructor(
)
)
}

) {

data class Connection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ class UserManager2 @Inject constructor(
log(TAG) { "allUsers(): shellMode=$shellMode" }

if (shellMode != null) {
val shellResult = shellOps.execute(ShellOpsCmd("pm list users"), shellMode)
if (shellResult.isSuccess) {
try {
val shellResult = shellOps.execute(ShellOpsCmd("pm list users"), shellMode)
log(TAG) { "allUser() result: $shellResult" }
if (!shellResult.isSuccess) throw IllegalStateException("allUser() failed: ")

shellResult.output
.mapNotNull { userListRegex.matchEntire(it) }
.mapNotNull { match ->
Expand All @@ -73,6 +76,8 @@ class UserManager2 @Inject constructor(
}
}
.run { profiles.addAll(this) }
} catch (e: Exception) {
log(TAG, ERROR) { "allUsers(): Lookup failed ${e.asLog()}" }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class RootIPC @AssistedInject constructor(
}

override fun bye(self: IBinder) {
log(TAG) { "self(self=$self)" }
log(TAG) { "bye(self=$self)" }
// The non-root process is either informing us it is going away, or it already died
synchronized(connections) {
getConnection(self)?.let { conn ->
Expand Down Expand Up @@ -205,6 +205,7 @@ class RootIPC @AssistedInject constructor(
log { "pruneConnections() $con: isBinderAlive=$it" }
}
}
connections.forEach { log { "Remaining connection after pruning: $it" } }
if (!connectionSeen && connections.size > 0) {
connectionSeen = true
synchronized(helloWaiter) { helloWaiter.notifyAll() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,39 @@ class FlowShell(
val session: Flow<Session> = sessionProducer

data class Session(
private val session: FlowProcess.Session,
internal val session: FlowProcess.Session,
) {

private val _tag = "$TAG:${session.id}"

private val writer by lazy {
OutputStreamWriter(session.input, StandardCharsets.UTF_8)
}

private fun InputStream.lineHarvester(tag: String) = flow {
if (isDebug) log(TAG, VERBOSE) { "Harverster($tag) is active" }
if (isDebug) log(_tag, VERBOSE) { "Harverster($tag) is active" }
bufferedReader().use { reader ->
reader.lines().consumeAsFlow().collect {
if (isDebug) log(TAG, VERBOSE) { "Harverster($tag) -> $it" }
if (isDebug) log(_tag, VERBOSE) { "Harverster($tag) -> $it" }
emit(it)
}
}
if (isDebug) log(TAG, VERBOSE) { "Harverster($tag) is finished" }
if (isDebug) log(_tag, VERBOSE) { "Harverster($tag) is finished" }
}.flowOn(Dispatchers.IO)

val output: Flow<String> = session.output!!.lineHarvester("output")

val error: Flow<String> = session.errors!!.lineHarvester("error")

suspend fun write(line: String, flush: Boolean = true) = withContext(Dispatchers.IO) {
if (isDebug) log(TAG) { "write(line=$line, flush=$flush)" }
writer.write(line + System.lineSeparator())
if (flush) writer.flush()
if (isDebug) log(_tag) { "write(line=$line, flush=$flush)" }
try {
writer.write(line + System.lineSeparator())
if (flush) writer.flush()
} catch (e: Exception) {
log(_tag, WARN) { "write($line,$flush) failed: $e" }
throw e
}
}

val exitCode: Flow<FlowProcess.ExitCode?>
Expand All @@ -94,18 +101,18 @@ class FlowShell(
}

suspend fun cancel() = withContext(Dispatchers.IO) {
if (isDebug) log(TAG) { "kill()" }
if (isDebug) log(_tag) { "kill()" }
session.cancel()
}

suspend fun close() = withContext(Dispatchers.IO) {
if (isDebug) log(TAG) { "close()" }
if (isDebug) log(_tag) { "close()" }
write("exit")
waitFor()
}
}

companion object {
private const val TAG = "FS:FlowShell"
private val TAG = "${FlowShellDebug.tag}:FlowShell"
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eu.darken.flowshell.core.cmd

import eu.darken.flowshell.core.FlowShell
import eu.darken.flowshell.core.FlowShellDebug
import eu.darken.flowshell.core.FlowShellDebug.isDebug
import eu.darken.flowshell.core.process.FlowProcess
import eu.darken.sdmse.common.debug.logging.Logging.Priority.VERBOSE
Expand Down Expand Up @@ -61,6 +62,7 @@ class FlowCmdShell(
data class Session(
private val session: FlowShell.Session,
) {
private val _tag = "${TAG}:${session.session.id}"
private val scope = CoroutineScope(Job() + Dispatchers.IO)
private val mutex = Mutex()

Expand All @@ -73,13 +75,13 @@ class FlowCmdShell(
suspend fun waitFor() = session.waitFor()

suspend fun cancel() = withContext(Dispatchers.IO) {
if (isDebug) log(TAG) { "kill()" }
if (isDebug) log(_tag) { "kill()" }
session.cancel()
scope.cancel()
}

suspend fun close() = withContext(Dispatchers.IO) {
if (isDebug) log(TAG) { "close()" }
if (isDebug) log(_tag) { "close()" }
session.close()
scope.cancel()
}
Expand All @@ -93,56 +95,56 @@ class FlowCmdShell(
val id = UUID.randomUUID().toString()
val idStart = "$id-start"
val idEnd = "$id-end"
log(TAG, VERBOSE) { "submit($cmdCount): $cmd" }
log(_tag, VERBOSE) { "submit($cmdCount): $cmd" }

val output = mutableListOf<String>()
val outputReady = CompletableDeferred<Unit>()
val outputJob = sharedOutput
.onSubscription {
outputReady.complete(Unit)
if (isDebug) log(TAG, VERBOSE) { "Output monitor started ($id)" }
if (isDebug) log(_tag, VERBOSE) { "Output monitor started ($id)" }
}
.dropWhile { it != idStart }.drop(1)
.onEach {
if (isDebug) log(TAG, VERBOSE) { "Adding (output-$id) $it" }
if (isDebug) log(_tag, VERBOSE) { "Adding (output-$id) $it" }
output.add(it)
}
.takeWhile { !it.startsWith(idEnd) }
.onCompletion { if (isDebug) log(TAG, VERBOSE) { "Output monitor finished ($id)" } }
.onCompletion { if (isDebug) log(_tag, VERBOSE) { "Output monitor finished ($id)" } }
.launchIn(this + Dispatchers.IO)

val errors = mutableListOf<String>()
val errorReady = CompletableDeferred<Unit>()
val errorJob = sharedErrors
.onSubscription {
errorReady.complete(Unit)
if (isDebug) log(TAG, VERBOSE) { "Error monitor started ($id)" }
if (isDebug) log(_tag, VERBOSE) { "Error monitor started ($id)" }
}
.dropWhile { it != idStart }.drop(1)
.takeWhile { it != idEnd }
.onEach {
if (isDebug) log(TAG, VERBOSE) { "Adding (errors-$id) $it" }
if (isDebug) log(_tag, VERBOSE) { "Adding (errors-$id) $it" }
errors.add(it)
}
.onCompletion { if (isDebug) log(TAG, VERBOSE) { "Error monitor finished ($id)" } }
.onCompletion { if (isDebug) log(_tag, VERBOSE) { "Error monitor finished ($id)" } }
.launchIn(this + Dispatchers.IO)

listOf(outputReady, errorReady).awaitAll()

if (isDebug) log(TAG, VERBOSE) { "Harvesters are ready, writing commands... ($id)" }
if (isDebug) log(_tag, VERBOSE) { "Harvesters are ready, writing commands... ($id)" }

session.write("echo $idStart", false)
session.write("echo $idStart >&2", false)
cmd.instructions.forEach { session.write(it, flush = false) }
session.write("echo $idEnd $?", false)
session.write("echo $idEnd >&2", true)

if (isDebug) log(TAG, VERBOSE) { "Commands are written, waiting... ($id)" }
if (isDebug) log(_tag, VERBOSE) { "Commands are written, waiting... ($id)" }

listOf(outputJob, errorJob).joinAll()

if (isDebug) log(TAG, VERBOSE) { "Determining exitcode ($id)" }
val rawExitCodeRow = output.removeLast()
if (isDebug) log(_tag, VERBOSE) { "Determining exitcode ($id)" }
val rawExitCodeRow = output.removeAt(output.lastIndex)

val exitCode = rawExitCodeRow
.split(" ")
Expand All @@ -155,12 +157,12 @@ class FlowCmdShell(
exitCode = exitCode,
output = output,
errors = errors
).also { log(TAG) { "submit($cmdCount): $cmd -> $it" } }
).also { log(_tag) { "submit($cmdCount): $cmd -> $it" } }
}
}
}

companion object {
private const val TAG = "FS:FlowCmdShell"
private val TAG = "${FlowShellDebug.tag}:FlowCmdShell"
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.darken.flowshell.core.process

import eu.darken.flowshell.core.FlowShellDebug
import eu.darken.flowshell.core.FlowShellDebug.isDebug
import eu.darken.sdmse.common.debug.logging.Logging.Priority.ERROR
import eu.darken.sdmse.common.debug.logging.Logging.Priority.VERBOSE
Expand All @@ -20,6 +21,7 @@ import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.UUID
import kotlin.coroutines.cancellation.CancellationException

class FlowProcess(
Expand All @@ -28,54 +30,57 @@ class FlowProcess(
) {

private val processCreator = callbackFlow {
if (isDebug) log(TAG, VERBOSE) { "Launching..." }
val shortId = UUID.randomUUID().toString().takeLast(4)
val _tag = "$TAG:$shortId"
if (isDebug) log(_tag, VERBOSE) { "Launching..." }
val process = launch()
if (isDebug) log(TAG, VERBOSE) { "Launched!" }
if (isDebug) log(_tag, VERBOSE) { "Launched!" }

val processExitCode = MutableStateFlow<ExitCode?>(null)

val killRoutine: suspend () -> Unit = {
if (isDebug) log(_tag, VERBOSE) { "killRoutine is executing" }
try {
kill(process)
} catch (e: Exception) {
log(TAG, ERROR) { "sessionKill threw up: ${e.asLog()}" }
log(_tag, ERROR) { "sessionKill threw up: ${e.asLog()}" }
throw e
}
}

val session = Session(
id = shortId,
process = process,
exitCode = processExitCode,
onKill = {
if (isDebug) log(TAG) { "Kill session due to kill()..." }
if (isDebug) log(_tag) { "Kill session due to kill()..." }
killRoutine()
}
)

// Send session first
if (isDebug) log(TAG) { "Emitting session: $session" }
if (isDebug) log(_tag) { "Emitting session: $session" }
send(session)

// Otherwise we could already have closed, if the process is short
launch(Dispatchers.IO + NonCancellable) {
if (isDebug) log(TAG, VERBOSE) { "Exit-monitor: Waiting for process finish" }
if (isDebug) log(_tag, VERBOSE) { "Exit-monitor: Waiting for process finish" }
val code = process.waitFor().let { ExitCode(it) }
if (isDebug) log(TAG) { "Exit-monitor: Process finished with $code" }
if (isDebug) log(_tag) { "Exit-monitor: Process finished with $code" }
processExitCode.value = code
[email protected]()
}

if (isDebug) log(TAG, VERBOSE) { "Waiting for flow to close..." }
if (isDebug) log(_tag, VERBOSE) { "Waiting for flow to close..." }
awaitClose {
if (isDebug) log(TAG, VERBOSE) { "Flow is closing..." }
if (isDebug) log(_tag, VERBOSE) { "awaitClose() passed, flow is closing..." }
runBlocking {
killRoutine()

if (isDebug) log(TAG) { "Waiting for process to be terminate" }
if (isDebug) log(_tag) { "kill() executed, waiting for process to terminate" }
process.waitFor()
if (isDebug) log(TAG) { "Process has terminated" }
if (isDebug) log(_tag) { "Process has terminated" }
}
if (isDebug) log(TAG, VERBOSE) { "Flow is closed!" }
if (isDebug) log(_tag, VERBOSE) { "Flow is closed!" }
}
}

Expand All @@ -93,6 +98,7 @@ class FlowProcess(
}

data class Session(
internal val id: String,
private val process: Process,
val exitCode: Flow<ExitCode?>,
private val onKill: suspend () -> Unit,
Expand All @@ -119,6 +125,6 @@ class FlowProcess(
}

companion object {
private const val TAG = "FS:FlowProcess"
private val TAG = "${FlowShellDebug.tag}:FlowProcess"
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.darken.flowshell.core.process

import eu.darken.flowshell.core.FlowShellDebug
import eu.darken.flowshell.core.FlowShellDebug.isDebug
import eu.darken.sdmse.common.debug.logging.Logging.Priority.ERROR
import eu.darken.sdmse.common.debug.logging.Logging.Priority.VERBOSE
Expand All @@ -16,7 +17,7 @@ import java.io.InputStream
import java.io.OutputStreamWriter
import java.util.regex.Pattern

private const val TAG = "FS:FlowProcess:Extensions"
private val TAG = "${FlowShellDebug.tag}:FlowProcess:Extensions"
private val PID_PATTERN = Pattern.compile("^.+?pid=(\\d+).+?$")
private val SPACES_PATTERN = Pattern.compile("\\s+")

Expand Down
Loading

0 comments on commit 4bda0bd

Please sign in to comment.