Skip to content

Commit

Permalink
Added more tests and updated the README file
Browse files Browse the repository at this point in the history
  • Loading branch information
rcardin committed May 28, 2024
1 parent 6e9a496 commit 4f8ed2d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 21 deletions.
52 changes: 32 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ The library is only available for Scala 3.

## Usage

The library provides a direct-style API for the structured concurrency of Project Loom. The library requires JDK 21 and
Scala 3. Moreover, Java preview features must be enabled in the Scala compiler.
The library provides a direct-style API for the structured concurrency of Project Loom. The library requires JDK 21 and Scala 3. Moreover, Java preview features must be enabled in the Scala compiler.

The main entry point is the `sus4s` package object. The following code snippet shows how to use the library:

Expand All @@ -41,19 +40,15 @@ val result: Int = structured {
println(result) // 85
```

The `structured` method creates a new structured concurrency scope represented by the `Suspend` trait. It's built on top
of the `java.util.concurrent.StructuredTaskScope` class. Hence, the threads forked inside the `structured` block are
Java Virtual Threads.
The `structured` method creates a new structured concurrency scope represented by the `Suspend` trait. It's built on top of the `java.util.concurrent.StructuredTaskScope` class. Hence, the threads forked inside the `structured` block are Java Virtual Threads.

The `fork` method creates a new Java Virtual Thread that executes the given block of code. The `fork` method executes
functions declared with the capability of suspend:
The `fork` method creates a new Java Virtual Thread that executes the given block of code. The `fork` method executes functions declared with the capability of suspend:

```scala 3
def findUserById(id: UserId): Suspend ?=> User
```

Coloring a function with the `Suspend` capability tells the caller that the function performs a suspendable operation,
aka some effect. Suspension is managed by the Loom runtime, which is responsible for scheduling the virtual threads.
Coloring a function with the `Suspend` capability tells the caller that the function performs a suspendable operation, aka some effect. Suspension is managed by the Loom runtime, which is responsible for scheduling the virtual threads.

A type alias is available for the `Suspend` capability:

Expand All @@ -67,17 +62,34 @@ So, the above function can be rewritten as:
def findUserById(id: UserId): Suspended[User]
```

Forking a suspendable function means creating a new virtual thread that executes the function. The thread is represented
by the `Job` class. The `Job` class provides the `value` method that waits for the completion of the virtual thread and
returns the result of the function.

The `structured` function uses structured concurrency to run the suspendable tasks. In detail, it ensures that the
thread executing the block waits for the completion of all the forked tasks. The structured blocks terminates when:
The `structured` function uses structured concurrency to run the suspendable tasks. In detail, it ensures that the thread executing the block waits for the completion of all the forked tasks. The structured blocks terminates when:

- all the forked tasks complete successfully
- one of the forked tasks throws an exception
- the block throws an exception

## The `Job` Class

Forking a suspendable function means creating a new virtual thread that executes the function. The thread is represented by the `Job` class. The `Job` class provides the `value` method that waits for the completion of the virtual thread and returns the result of the function:

```scala 3
val job1: Job[Int] = fork {
Thread.sleep(1000)
42
}
val meaningOfLife: Int = job1.value
```

If you're not interested in the result of the function, you can use the `join` method:

```scala 3
val job1: Job[Int] = fork {
Thread.sleep(1000)
println("The meaning of life is 42")
}
job1.join()
```

The `structured` function is completely transparent to any exception thrown by the block or by any of the forked tasks.

## Canceling a Job
Expand Down Expand Up @@ -109,11 +121,11 @@ queue.toArray should contain theSameElementsInOrderAs List("job2", "job1")
result shouldBe 43
```

Cancellation is collaborative. In the above example, the job `innerCancellableJob` is marked for cancellation by the call
`innerCancellableJob.cancel()`. However, the job is not immediately canceled. The job is canceled when it reaches the first
point operation that can be _interrupted_ by the JVM. Hence, cancellation is based upon the concept of interruption. In
the above example, the `innerCancellableJob` is canceled when it reaches the `Thread.sleep(2000)` operation. If we remove
the `Thread.sleep` operation, the job will never be canceled. A similar behavior is implemented by Kotlin coroutines (see [Kotlin Coroutines - A Comprehensive Introduction / Cancellation](https://blog.rockthejvm.com/kotlin-coroutines-101/#7-cancellation) for further details).
Cancellation is collaborative. In the above example, the job `innerCancellableJob` is marked for cancellation by the call `innerCancellableJob.cancel()`. However, the job is not immediately canceled. The job is canceled when it reaches the first point operation that can be _interrupted_ by the JVM. Hence, cancellation is based upon the concept of interruption. In the above example, the `innerCancellableJob` is canceled when it reaches the `Thread.sleep(2000)` operation. If we remove the `Thread.sleep` operation, the job will never be canceled. A similar behavior is implemented by Kotlin coroutines (see [Kotlin Coroutines - A Comprehensive Introduction / Cancellation](https://blog.rockthejvm.com/kotlin-coroutines-101/#7-cancellation) for further details).

Trying to get the value from a canceled job will throw a `InterruptedException`. However, joining a canceled job will not throw any exception.

You won't pay any additional cost for canceling a job. The cancellation mechanism is based on the interruption of the virtual thread. No new structured scope is created for the cancellation mechanism.

## Contributing

Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/in/rcard/sus4s/sus4s.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package in.rcard.sus4s

import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure
import java.util.concurrent.{CompletableFuture, StructuredTaskScope}
import scala.concurrent.ExecutionException

object sus4s {

Expand Down Expand Up @@ -31,7 +32,11 @@ object sus4s {
private val cf: CompletableFuture[A],
private val executingThread: CompletableFuture[Thread]
) {
def value: A = cf.get()
def value: A = try cf.get()
catch
case exex: ExecutionException => throw exex.getCause
case throwable: Throwable => throw throwable

def join(): Unit =
cf.handle((_, throwable) => {
throwable match {
Expand All @@ -40,6 +45,7 @@ object sus4s {
case _ => throw throwable
}
})

def cancel(): Suspend ?=> Unit = {
// FIXME Refactor this code
_cancel(executingThread.get(), summon[Suspend].asInstanceOf[SuspendScope].relationships)
Expand Down
29 changes: 29 additions & 0 deletions core/src/test/scala/StructuredSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,5 +221,34 @@ class StructuredSpec extends AnyFlatSpec with Matchers {
result shouldBe 43
}

it should "not throw any exception when joining a cancelled job" in {
val expected = structured {
val cancellable = fork {
while (true) {
Thread.sleep(2000)
}
}
Thread.sleep(500)
cancellable.cancel()
cancellable.join()
42
}

expected shouldBe 42
}

it should "throw an exception when asking for the value of a cancelled job" in {
assertThrows[InterruptedException] {
structured {
val cancellable = fork {
while (true) {
Thread.sleep(2000)
}
}
Thread.sleep(500)
cancellable.cancel()
cancellable.value
}
}
}
}

0 comments on commit 4f8ed2d

Please sign in to comment.