From 4f8ed2dda1fd0292d4855ba3858d6c7528f13765 Mon Sep 17 00:00:00 2001 From: rcardin Date: Tue, 28 May 2024 10:22:55 +0200 Subject: [PATCH] Added more tests and updated the README file --- README.md | 52 ++++++++++++------- .../src/main/scala/in/rcard/sus4s/sus4s.scala | 8 ++- core/src/test/scala/StructuredSpec.scala | 29 +++++++++++ 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index bdec733..f89c906 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,7 @@ The library is only available for Scala 3. ## Usage -The library provides a direct-style API for the structured concurrency of Project Loom. The library requires JDK 21 and -Scala 3. Moreover, Java preview features must be enabled in the Scala compiler. +The library provides a direct-style API for the structured concurrency of Project Loom. The library requires JDK 21 and Scala 3. Moreover, Java preview features must be enabled in the Scala compiler. The main entry point is the `sus4s` package object. The following code snippet shows how to use the library: @@ -41,19 +40,15 @@ val result: Int = structured { println(result) // 85 ``` -The `structured` method creates a new structured concurrency scope represented by the `Suspend` trait. It's built on top -of the `java.util.concurrent.StructuredTaskScope` class. Hence, the threads forked inside the `structured` block are -Java Virtual Threads. +The `structured` method creates a new structured concurrency scope represented by the `Suspend` trait. It's built on top of the `java.util.concurrent.StructuredTaskScope` class. Hence, the threads forked inside the `structured` block are Java Virtual Threads. -The `fork` method creates a new Java Virtual Thread that executes the given block of code. The `fork` method executes -functions declared with the capability of suspend: +The `fork` method creates a new Java Virtual Thread that executes the given block of code. The `fork` method executes functions declared with the capability of suspend: ```scala 3 def findUserById(id: UserId): Suspend ?=> User ``` -Coloring a function with the `Suspend` capability tells the caller that the function performs a suspendable operation, -aka some effect. Suspension is managed by the Loom runtime, which is responsible for scheduling the virtual threads. +Coloring a function with the `Suspend` capability tells the caller that the function performs a suspendable operation, aka some effect. Suspension is managed by the Loom runtime, which is responsible for scheduling the virtual threads. A type alias is available for the `Suspend` capability: @@ -67,17 +62,34 @@ So, the above function can be rewritten as: def findUserById(id: UserId): Suspended[User] ``` -Forking a suspendable function means creating a new virtual thread that executes the function. The thread is represented -by the `Job` class. The `Job` class provides the `value` method that waits for the completion of the virtual thread and -returns the result of the function. - -The `structured` function uses structured concurrency to run the suspendable tasks. In detail, it ensures that the -thread executing the block waits for the completion of all the forked tasks. The structured blocks terminates when: +The `structured` function uses structured concurrency to run the suspendable tasks. In detail, it ensures that the thread executing the block waits for the completion of all the forked tasks. The structured blocks terminates when: - all the forked tasks complete successfully - one of the forked tasks throws an exception - the block throws an exception +## The `Job` Class + +Forking a suspendable function means creating a new virtual thread that executes the function. The thread is represented by the `Job` class. The `Job` class provides the `value` method that waits for the completion of the virtual thread and returns the result of the function: + +```scala 3 +val job1: Job[Int] = fork { + Thread.sleep(1000) + 42 +} +val meaningOfLife: Int = job1.value +``` + +If you're not interested in the result of the function, you can use the `join` method: + +```scala 3 +val job1: Job[Int] = fork { + Thread.sleep(1000) + println("The meaning of life is 42") +} +job1.join() +``` + The `structured` function is completely transparent to any exception thrown by the block or by any of the forked tasks. ## Canceling a Job @@ -109,11 +121,11 @@ queue.toArray should contain theSameElementsInOrderAs List("job2", "job1") result shouldBe 43 ``` -Cancellation is collaborative. In the above example, the job `innerCancellableJob` is marked for cancellation by the call -`innerCancellableJob.cancel()`. However, the job is not immediately canceled. The job is canceled when it reaches the first -point operation that can be _interrupted_ by the JVM. Hence, cancellation is based upon the concept of interruption. In -the above example, the `innerCancellableJob` is canceled when it reaches the `Thread.sleep(2000)` operation. If we remove -the `Thread.sleep` operation, the job will never be canceled. A similar behavior is implemented by Kotlin coroutines (see [Kotlin Coroutines - A Comprehensive Introduction / Cancellation](https://blog.rockthejvm.com/kotlin-coroutines-101/#7-cancellation) for further details). +Cancellation is collaborative. In the above example, the job `innerCancellableJob` is marked for cancellation by the call `innerCancellableJob.cancel()`. However, the job is not immediately canceled. The job is canceled when it reaches the first point operation that can be _interrupted_ by the JVM. Hence, cancellation is based upon the concept of interruption. In the above example, the `innerCancellableJob` is canceled when it reaches the `Thread.sleep(2000)` operation. If we remove the `Thread.sleep` operation, the job will never be canceled. A similar behavior is implemented by Kotlin coroutines (see [Kotlin Coroutines - A Comprehensive Introduction / Cancellation](https://blog.rockthejvm.com/kotlin-coroutines-101/#7-cancellation) for further details). + +Trying to get the value from a canceled job will throw a `InterruptedException`. However, joining a canceled job will not throw any exception. + +You won't pay any additional cost for canceling a job. The cancellation mechanism is based on the interruption of the virtual thread. No new structured scope is created for the cancellation mechanism. ## Contributing diff --git a/core/src/main/scala/in/rcard/sus4s/sus4s.scala b/core/src/main/scala/in/rcard/sus4s/sus4s.scala index e9ee06d..6d5e690 100644 --- a/core/src/main/scala/in/rcard/sus4s/sus4s.scala +++ b/core/src/main/scala/in/rcard/sus4s/sus4s.scala @@ -2,6 +2,7 @@ package in.rcard.sus4s import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure import java.util.concurrent.{CompletableFuture, StructuredTaskScope} +import scala.concurrent.ExecutionException object sus4s { @@ -31,7 +32,11 @@ object sus4s { private val cf: CompletableFuture[A], private val executingThread: CompletableFuture[Thread] ) { - def value: A = cf.get() + def value: A = try cf.get() + catch + case exex: ExecutionException => throw exex.getCause + case throwable: Throwable => throw throwable + def join(): Unit = cf.handle((_, throwable) => { throwable match { @@ -40,6 +45,7 @@ object sus4s { case _ => throw throwable } }) + def cancel(): Suspend ?=> Unit = { // FIXME Refactor this code _cancel(executingThread.get(), summon[Suspend].asInstanceOf[SuspendScope].relationships) diff --git a/core/src/test/scala/StructuredSpec.scala b/core/src/test/scala/StructuredSpec.scala index 6b0c9ca..06a2332 100644 --- a/core/src/test/scala/StructuredSpec.scala +++ b/core/src/test/scala/StructuredSpec.scala @@ -221,5 +221,34 @@ class StructuredSpec extends AnyFlatSpec with Matchers { result shouldBe 43 } + it should "not throw any exception when joining a cancelled job" in { + val expected = structured { + val cancellable = fork { + while (true) { + Thread.sleep(2000) + } + } + Thread.sleep(500) + cancellable.cancel() + cancellable.join() + 42 + } + expected shouldBe 42 + } + + it should "throw an exception when asking for the value of a cancelled job" in { + assertThrows[InterruptedException] { + structured { + val cancellable = fork { + while (true) { + Thread.sleep(2000) + } + } + Thread.sleep(500) + cancellable.cancel() + cancellable.value + } + } + } }