Skip to content

Commit

Permalink
Merge pull request #3 from rcardin/1-add-cancellation-to-jobs
Browse files Browse the repository at this point in the history
Add cancellation to jobs
  • Loading branch information
rcardin authored May 28, 2024
2 parents ba91067 + 5a68107 commit 91daedd
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 30 deletions.
107 changes: 92 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ The library is only available for Scala 3.

## Usage

The library provides a direct-style API for the structured concurrency of Project Loom. The library requires JDK 21 and
Scala 3. Moreover, Java preview features must be enabled in the Scala compiler.
The library provides a direct-style API for the structured concurrency of Project Loom. The library requires JDK 21 and Scala 3. Moreover, Java preview features must be enabled in the Scala compiler.

The main entry point is the `sus4s` package object. The following code snippet shows how to use the library:

Expand All @@ -41,19 +40,15 @@ val result: Int = structured {
println(result) // 85
```

The `structured` method creates a new structured concurrency scope represented by the `Suspend` trait. It's built on top
of the `java.util.concurrent.StructuredTaskScope` class. Hence, the threads forked inside the `structured` block are
Java Virtual Threads.
The `structured` method creates a new structured concurrency scope represented by the `Suspend` trait. It's built on top of the `java.util.concurrent.StructuredTaskScope` class. Hence, the threads forked inside the `structured` block are Java Virtual Threads.

The `fork` method creates a new Java Virtual Thread that executes the given block of code. The `fork` method executes
functions declared with the capability of suspend:
The `fork` method creates a new Java Virtual Thread that executes the given block of code. The `fork` method executes functions declared with the capability of suspend:

```scala 3
def findUserById(id: UserId): Suspend ?=> User
```

Coloring a function with the `Suspend` capability tells the caller that the function performs a suspendable operation,
aka some effect. Suspension is managed by the Loom runtime, which is responsible for scheduling the virtual threads.
Coloring a function with the `Suspend` capability tells the caller that the function performs a suspendable operation, aka some effect. Suspension is managed by the Loom runtime, which is responsible for scheduling the virtual threads.

A type alias is available for the `Suspend` capability:

Expand All @@ -67,19 +62,101 @@ So, the above function can be rewritten as:
def findUserById(id: UserId): Suspended[User]
```

Forking a suspendable function means creating a new virtual thread that executes the function. The thread is represented
by the `Job` class. The `Job` class provides the `value` method that waits for the completion of the virtual thread and
returns the result of the function.

The `structured` function uses structured concurrency to run the suspendable tasks. In detail, it ensures that the
thread executing the block waits for the completion of all the forked tasks. The structured blocks terminates when:
The `structured` function uses structured concurrency to run the suspendable tasks. In detail, it ensures that the thread executing the block waits for the completion of all the forked tasks. The structured blocks terminates when:

- all the forked tasks complete successfully
- one of the forked tasks throws an exception
- the block throws an exception

## The `Job` Class

Forking a suspendable function means creating a new virtual thread that executes the function. The thread is represented by the `Job` class. The `Job` class provides the `value` method that waits for the completion of the virtual thread and returns the result of the function:

```scala 3
val job1: Job[Int] = fork {
Thread.sleep(1000)
42
}
val meaningOfLife: Int = job1.value
```

If you're not interested in the result of the function, you can use the `join` method:

```scala 3
val job1: Job[Int] = fork {
Thread.sleep(1000)
println("The meaning of life is 42")
}
job1.join()
```

The `structured` function is completely transparent to any exception thrown by the block or by any of the forked tasks.

## Canceling a Job

It's possible to cancel a job by calling the `cancel` method on the `Job` instance. The following code snippet shows how:

```scala 3
val queue = new ConcurrentLinkedQueue[String]()
val result = structured {
val job1 = fork {
val innerCancellableJob = fork {
while (true) {
Thread.sleep(2000)
queue.add("cancellable")
}
}
Thread.sleep(1000)
innerCancellableJob.cancel()
queue.add("job1")
}
val job = fork {
Thread.sleep(500)
queue.add("job2")
43
}
job.value
}
queue.toArray should contain theSameElementsInOrderAs List("job2", "job1")
result shouldBe 43
```

Cancellation is collaborative. In the above example, the job `innerCancellableJob` is marked for cancellation by the call `innerCancellableJob.cancel()`. However, the job is not immediately canceled. The job is canceled when it reaches the first point operation that can be _interrupted_ by the JVM. Hence, cancellation is based upon the concept of interruption. In the above example, the `innerCancellableJob` is canceled when it reaches the `Thread.sleep(2000)` operation. If we remove the `Thread.sleep` operation, the job will never be canceled. A similar behavior is implemented by Kotlin coroutines (see [Kotlin Coroutines - A Comprehensive Introduction / Cancellation](https://blog.rockthejvm.com/kotlin-coroutines-101/#7-cancellation) for further details).

Cancelling a job follows the relationship between parent and child jobs. If a parent job is canceled, all the child jobs are canceled as well:

```scala 3
val expectedQueue = structured {
val queue = new ConcurrentLinkedQueue[String]()
val job1 = fork {
val innerJob = fork {
fork {
Thread.sleep(3000)
println("inner-inner-Job")
queue.add("inner-inner-Job")
}
Thread.sleep(2000)
println("innerJob")
queue.add("innerJob")
}
Thread.sleep(1000)
queue.add("job1")
}
val job = fork {
Thread.sleep(500)
job1.cancel()
queue.add("job2")
43
}
queue
}
expectedQueue.toArray should contain theSameElementsInOrderAs List("job2")
```

Trying to get the value from a canceled job will throw a `InterruptedException`. However, joining a canceled job will not throw any exception.

**You won't pay any additional cost for canceling a job**. The cancellation mechanism is based on the interruption of the virtual thread. No new structured scope is created for the cancellation mechanism.

## Contributing

If you want to contribute to the project, please do it! Any help is welcome.
Expand Down
119 changes: 104 additions & 15 deletions core/src/main/scala/in/rcard/sus4s/sus4s.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package in.rcard.sus4s

import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure
import java.util.concurrent.{CompletableFuture, StructuredTaskScope}
import scala.concurrent.ExecutionException

object sus4s {

Expand All @@ -11,6 +13,12 @@ object sus4s {
private[sus4s] val scope: StructuredTaskScope[Any]
}

// noinspection ScalaWeakerAccess
final class SuspendScope(override private[sus4s] val scope: StructuredTaskScope[Any])
extends Suspend {
private[sus4s] val relationships = scala.collection.mutable.Map.empty[Thread, List[Thread]]
}

/** A value of type `A` obtained by a function that can suspend */
type Suspended[A] = Suspend ?=> A

Expand All @@ -20,8 +28,81 @@ object sus4s {
* @tparam A
* The type of the value returned by the job
*/
class Job[A] private[sus4s] (private val cf: CompletableFuture[A]) {
def value: A = cf.join()
class Job[A] private[sus4s] (
private val cf: CompletableFuture[A],
private val executingThread: CompletableFuture[Thread]
) {

/** Returns the value of the job. If the job is not completed, the method blocks until the job
* completes. If the job is cancelled, the method throws an [[InterruptedException]].
* @return
* The value of the job
*/
def value: A =
try cf.get()
catch
case exex: ExecutionException => throw exex.getCause
case throwable: Throwable => throw throwable

/** Waits for the completion of the job. If the job is cancelled, no exception is thrown.
*/
def join(): Unit =
cf.handle((_, throwable) => {
throwable match {
case null => ()
case _: InterruptedException => ()
case _ => throw throwable
}
})

/** Cancels the job and all its children jobs. Getting the value of a cancelled job throws an
* [[InterruptedException]]. Cancellation is an idempotent operation.
*
* <h2>Example</h2>
* {{{
* val expectedQueue = structured {
* val queue = new ConcurrentLinkedQueue[String]()
* val job1 = fork {
* val innerJob = fork {
* fork {
* Thread.sleep(3000)
* println("inner-inner-Job")
* queue.add("inner-inner-Job")
* }
* Thread.sleep(2000)
* println("innerJob")
* queue.add("innerJob")
* }
* Thread.sleep(1000)
* queue.add("job1")
* }
* val job = fork {
* Thread.sleep(500)
* job1.cancel()
* queue.add("job2")
* }
* queue
* }
* expectedQueue.toArray should contain theSameElementsInOrderAs List("job2")
* }}}
*/
def cancel(): Suspend ?=> Unit = {
// FIXME Refactor this code
_cancel(executingThread.get(), summon[Suspend].asInstanceOf[SuspendScope].relationships)
cf.completeExceptionally(new InterruptedException("Job cancelled"))
}

private def _cancel(
thread: Thread,
relationships: scala.collection.mutable.Map[Thread, List[Thread]]
): Unit = {
relationships.get(thread) match {
case Some(children) =>
children.foreach(_cancel(_, relationships))
case None => ()
}
thread.interrupt()
}
}

/** Executes a block of code applying structured concurrency to the contained suspendable tasks
Expand Down Expand Up @@ -65,27 +146,24 @@ object sus4s {
* The result of the block
*/
inline def structured[A](inline block: Suspend ?=> A): A = {
val _scope = new StructuredTaskScope.ShutdownOnFailure()

given suspended: Suspend = new Suspend {
override val scope: StructuredTaskScope[Any] = _scope
}
val loomScope = new ShutdownOnFailure()
given suspended: Suspend = SuspendScope(loomScope)

try {
val mainTask = _scope.fork(() => {
val mainTask = loomScope.fork(() => {
block
})
_scope.join().throwIfFailed(identity)
loomScope.join().throwIfFailed(identity)
mainTask.get()
} finally {
_scope.close()
loomScope.close()
}
}

/** Forks a new concurrent task executing the given block of code and returning a [[Job]] that
* completes with the value of type `A`. The task is executed in a new Virtual Thread using the
* given [[Suspend]] context. The job is transparent to any exception thrown by the `block`, which
* means it rethrows the exception.
* given [[Suspend]] context. The job is transparent to any exception thrown by the `block`,
* which means it rethrows the exception.
*
* <h2>Example</h2>
* {{{
Expand All @@ -109,14 +187,25 @@ object sus4s {
* [[structured]]
*/
def fork[A](block: Suspend ?=> A): Suspend ?=> Job[A] = {
val result = new CompletableFuture[A]()
summon[Suspend].scope.fork(() => {
val result = new CompletableFuture[A]()
val executingThread = new CompletableFuture[Thread]()
val suspendScope: SuspendScope = summon[Suspend].asInstanceOf[SuspendScope]
val parentThread: Thread = Thread.currentThread()
suspendScope.scope.fork(() => {
val childThread = Thread.currentThread()
suspendScope.relationships.updateWith(parentThread) {
case None => Some(List(childThread))
case Some(l) => Some(childThread :: l)
}
executingThread.complete(childThread)
try result.complete(block)
catch
case _: InterruptedException =>
result.completeExceptionally(new InterruptedException("Job cancelled"))
case throwable: Throwable =>
result.completeExceptionally(throwable)
throw throwable;
})
Job(result)
Job(result, executingThread)
}
}
Loading

0 comments on commit 91daedd

Please sign in to comment.