From 01af5b91a54f30ddd121258e81aff2ddc2a99ff9 Mon Sep 17 00:00:00 2001 From: Philipp Zagar Date: Wed, 28 Feb 2024 18:09:54 -0800 Subject: [PATCH] Add AsyncSemaphore (#7) # Add AsyncSemaphore ## :recycle: Current situation & Problem SpeziBluetooth (and @Supereg) implemented an `AsyncSemaphore`: https://github.com/StanfordSpezi/SpeziBluetooth/blob/main/Sources/SpeziBluetooth/Utils/AsyncSempahore.swift This functionality should be moved to the foundation package. ## :gear: Release Notes - Add `AsyncSemaphore` to SpeziFoundation package ## :books: Documentation Added proper docs ## :white_check_mark: Testing - Added test cases, adjusted from https://github.com/groue/Semaphore ## :pencil: Code of Conduct & Contributing Guidelines By submitting creating this pull request, you agree to follow our [Code of Conduct](https://github.com/StanfordSpezi/.github/blob/main/CODE_OF_CONDUCT.md) and [Contributing Guidelines](https://github.com/StanfordSpezi/.github/blob/main/CONTRIBUTING.md): - [x] I agree to follow the [Code of Conduct](https://github.com/StanfordSpezi/.github/blob/main/CODE_OF_CONDUCT.md) and [Contributing Guidelines](https://github.com/StanfordSpezi/.github/blob/main/CONTRIBUTING.md). --- .../Semaphore/AsyncSemaphore.swift | 242 +++++++++++ .../SpeziFoundation.docc/SpeziFoundation.md | 4 + .../AsyncSemaphoreTests.swift | 384 ++++++++++++++++++ 3 files changed, 630 insertions(+) create mode 100644 Sources/SpeziFoundation/Semaphore/AsyncSemaphore.swift create mode 100644 Tests/SpeziFoundationTests/AsyncSemaphoreTests.swift diff --git a/Sources/SpeziFoundation/Semaphore/AsyncSemaphore.swift b/Sources/SpeziFoundation/Semaphore/AsyncSemaphore.swift new file mode 100644 index 0000000..484f17e --- /dev/null +++ b/Sources/SpeziFoundation/Semaphore/AsyncSemaphore.swift @@ -0,0 +1,242 @@ +// +// This source file is part of the Stanford Spezi open-source project +// +// SPDX-FileCopyrightText: 2024 Stanford University and the project authors (see CONTRIBUTORS.md) +// +// SPDX-License-Identifier: MIT +// + +import Foundation + + +/// Asynchronous semaphore for coordinating the concurrent execution of tasks. +/// +/// ``AsyncSemaphore`` provides a mechanism to regulate access to a resource that allows multiple accesses up to a certain limit. +/// Beyond this limit, tasks must wait until the semaphore signals that access is available. It supports both cancellable and non-cancellable waits, +/// enabling tasks to either proceed when the semaphore is available or throw a `CancellationError` if the task was cancelled while waiting. +/// +/// - Note: This semaphore uses Foundation's `NSLock` for thread safety and handles task suspension and resumption internally. +/// +/// ### Usage +/// +/// Initialize ``AsyncSemaphore`` with the maximum number of concurrent accesses allowed: +/// ``` +/// let semaphore = AsyncSemaphore(value: 3) +/// ``` +/// +/// To wait for access (blocking the task until access is available): +/// ``` +/// await semaphore.wait() +/// ``` +/// +/// To wait for access but track cancellations (leading to the throwing of a `CancellationError`): +/// ``` +/// try await semaphore.waitCheckingCancellation() +/// ``` +/// +/// To signal that a task has completed its access, potentially allowing waiting tasks to proceed: +/// ``` +/// semaphore.signal() +/// ``` +/// +/// To signal all waiting tasks to proceed: +/// ``` +/// semaphore.signalAll() +/// ``` +/// +/// To cancel all waiting tasks (only those that support cancellation): +/// ``` +/// semaphore.cancelAll() +/// ``` +/// +/// - Warning: `cancelAll` will trigger a runtime error if it attempts to cancel tasks that are not cancellable. +public final class AsyncSemaphore: @unchecked Sendable { + private enum Suspension { + case cancelable(UnsafeContinuation) + case regular(UnsafeContinuation) + + + func resume() { + switch self { + case let .regular(continuation): + continuation.resume() + case let .cancelable(continuation): + continuation.resume() + } + } + } + + private struct SuspendedTask: Identifiable { + let id: UUID + let suspension: Suspension + } + + + private var value: Int + private var suspendedTasks: [SuspendedTask] = [] + private let nsLock = NSLock() + + + /// Initializes a new semaphore with a given concurrency limit. + /// + /// - Parameter value: The maximum number of concurrent accesses allowed. Must be non-negative. + public init(value: Int = 1) { + precondition(value >= 0) + self.value = value + } + + + /// Decreases the semaphore count and waits if the count is less than zero. + /// + /// Use this method when access to a resource should be awaited without the possibility of cancellation. + public func wait() async { + lock() + + value -= 1 + if value >= 0 { + unlock() + return + } + + await withUnsafeContinuation { continuation in + suspendedTasks.append(SuspendedTask(id: UUID(), suspension: .regular(continuation))) + unlock() + } + } + + /// Decreases the semaphore count and throws a `CancellationError` if the current `Task` is cancelled. + /// + /// This method allows the `Task` calling ``waitCheckingCancellation()`` to be cancelled while waiting, throwing a `CancellationError` if the `Task` is cancelled before it can proceed. + /// + /// - Throws: `CancellationError` if the task is cancelled while waiting. + public func waitCheckingCancellation() async throws { + try Task.checkCancellation() // check if we are already cancelled + + lock() + + do { + // check if we got cancelled while acquiring the lock + try Task.checkCancellation() + } catch { + unlock() + throw error + } + + value -= 1 // decrease the value + if value >= 0 { + unlock() + return + } + + let id = UUID() + + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in + if Task.isCancelled { + value += 1 // restore the value + unlock() + + continuation.resume(throwing: CancellationError()) + } else { + suspendedTasks.append(SuspendedTask(id: id, suspension: .cancelable(continuation))) + unlock() + } + } + } onCancel: { + self.lock() + + value += 1 + + guard let index = suspendedTasks.firstIndex(where: { $0.id == id }) else { + preconditionFailure("Inconsistent internal state reached") + } + + let task = suspendedTasks[index] + suspendedTasks.remove(at: index) + + unlock() + + switch task.suspension { + case .regular: + preconditionFailure("Tried to cancel a task that was not cancellable!") + case let .cancelable(continuation): + continuation.resume(throwing: CancellationError()) + } + } + } + + + /// Signals the semaphore, allowing one waiting task to proceed. + /// + /// If there are `Task`s waiting for access, calling this method will resume one of them. + /// + /// - Returns: `true` if a task was resumed, `false` otherwise. + @discardableResult + public func signal() -> Bool { + lock() + + value += 1 + + guard let first = suspendedTasks.first else { + unlock() + return false + } + + suspendedTasks.removeFirst() + unlock() + + first.suspension.resume() + return true + } + + /// Signals the semaphore, allowing all waiting `Task`s to proceed. + /// + /// This method resumes all `Task`s that are currently waiting for access. + public func signalAll() { + lock() + + value += suspendedTasks.count + + let tasks = suspendedTasks + self.suspendedTasks.removeAll() + + unlock() + + for task in tasks { + task.suspension.resume() + } + } + + /// Cancels all waiting `Task`s that can be cancelled. + /// + /// This method attempts to cancel all `Task`s that are currently waiting and support cancellation. `Task`s that do not support cancellation will cause a runtime error. + /// + /// - Warning: Will trigger a runtime error if it attempts to cancel `Task`s that are not cancellable. + public func cancelAll() { + lock() + + value += suspendedTasks.count + + let tasks = suspendedTasks + self.suspendedTasks.removeAll() + + unlock() + + for task in tasks { + switch task.suspension { + case .regular: + preconditionFailure("Tried to cancel a task that was not cancellable!") + case let .cancelable(continuation): + continuation.resume(throwing: CancellationError()) + } + } + } + + private func lock() { + nsLock.lock() + } + + private func unlock() { + nsLock.unlock() + } +} diff --git a/Sources/SpeziFoundation/SpeziFoundation.docc/SpeziFoundation.md b/Sources/SpeziFoundation/SpeziFoundation.docc/SpeziFoundation.md index 6a70e67..3028413 100644 --- a/Sources/SpeziFoundation/SpeziFoundation.docc/SpeziFoundation.md +++ b/Sources/SpeziFoundation/SpeziFoundation.docc/SpeziFoundation.md @@ -23,6 +23,10 @@ Spezi Foundation provides a base layer of functionality useful in many applicati - ``AnyArray`` - ``AnyOptional`` +### Semaphore + +- ``AsyncSemaphore`` + ### Runtime Configuration - `RuntimeConfig` (exposed via the `TestingSupport` SPI target) diff --git a/Tests/SpeziFoundationTests/AsyncSemaphoreTests.swift b/Tests/SpeziFoundationTests/AsyncSemaphoreTests.swift new file mode 100644 index 0000000..0524554 --- /dev/null +++ b/Tests/SpeziFoundationTests/AsyncSemaphoreTests.swift @@ -0,0 +1,384 @@ +// +// This source file is part of the Stanford Spezi open-source project +// +// SPDX-FileCopyrightText: 2024 Stanford University and the project authors (see CONTRIBUTORS.md) +// +// SPDX-License-Identifier: MIT +// +// Tests adopted from https://github.com/groue/Semaphore/blob/main/Sources/Semaphore/AsyncSemaphore.swift. +// + +import Dispatch +@testable import SpeziFoundation +import XCTest + + +final class AsyncSemaphoreTests: XCTestCase { // swiftlint:disable:this type_body_length + func testSignalWithoutSuspendedTasks() { + let dispatchSemZero = DispatchSemaphore(value: 0) + XCTAssertEqual(dispatchSemZero.signal(), 0) + + let dispatchSemOne = DispatchSemaphore(value: 1) + XCTAssertEqual(dispatchSemOne.signal(), 0) + + let dispatchSemTwo = DispatchSemaphore(value: 2) + XCTAssertEqual(dispatchSemTwo.signal(), 0) + + let asyncSemZero = AsyncSemaphore(value: 0) + let wokenZero = asyncSemZero.signal() + XCTAssertFalse(wokenZero) + + let asyncSemOne = AsyncSemaphore(value: 1) + let wokenOne = asyncSemOne.signal() + XCTAssertFalse(wokenOne) + + let asyncSemTwo = AsyncSemaphore(value: 2) + let wokenTwo = asyncSemTwo.signal() + XCTAssertFalse(wokenTwo) + } + + func testSignalReturnsWhetherItResumesSuspendedTask() async throws { + let delay: Duration = .milliseconds(500) + + // Check DispatchSemaphore behavior + do { + // Given a thread waiting for the semaphore + let sem = DispatchSemaphore(value: 0) + Thread { sem.wait() }.start() + try await Task.sleep(for: delay) + + // First signal wakes the waiting thread + XCTAssertNotEqual(sem.signal(), 0) + // Second signal does not wake any thread + XCTAssertEqual(sem.signal(), 0) + } + + // Test that AsyncSemaphore behaves identically + do { + // Given a task suspended on the semaphore + let sem = AsyncSemaphore(value: 0) + Task { await sem.wait() } + try await Task.sleep(for: delay) + + // First signal resumes the suspended task + XCTAssertTrue(sem.signal()) + // Second signal does not resume any task + XCTAssertFalse(sem.signal()) + } + } + + func testWaitSuspendsOnZeroSemaphoreUntilSignal() { + // Check DispatchSemaphore behavior + do { + // Given a zero semaphore + let sem = DispatchSemaphore(value: 0) + + // When a thread waits for this semaphore, + let ex1 = expectation(description: "wait") + ex1.isInverted = true + let ex2 = expectation(description: "woken") + Thread { + sem.wait() + ex1.fulfill() + ex2.fulfill() + }.start() + + // Then the thread is initially blocked. + wait(for: [ex1], timeout: 0.5) + + // When a signal occurs, then the waiting thread is woken. + sem.signal() + wait(for: [ex2], timeout: 1) + } + + // Test that AsyncSemaphore behaves identically + do { + // Given a zero semaphore + let sem = AsyncSemaphore(value: 0) + + // When a task waits for this semaphore, + let ex1 = expectation(description: "wait") + ex1.isInverted = true + let ex2 = expectation(description: "woken") + Task { + await sem.wait() + ex1.fulfill() + ex2.fulfill() + } + + // Then the task is initially suspended. + wait(for: [ex1], timeout: 0.5) + + // When a signal occurs, then the suspended task is resumed. + sem.signal() + wait(for: [ex2], timeout: 0.5) + } + } + + func testCancellationWhileSuspendedThrowsCancellationError() async throws { + let sem = AsyncSemaphore(value: 0) + let exp = expectation(description: "cancellation") + let task = Task { + do { + try await sem.waitCheckingCancellation() + XCTFail("Expected CancellationError") + } catch is CancellationError { + } catch { + XCTFail("Unexpected error") + } + exp.fulfill() + } + try await Task.sleep(for: .milliseconds(100)) + task.cancel() + await fulfillment(of: [exp], timeout: 1) + } + + func testCancellationBeforeSuspensionThrowsCancellationError() throws { + let sem = AsyncSemaphore(value: 0) + let exp = expectation(description: "cancellation") + let task = Task { + // Uncancellable delay + await withUnsafeContinuation { continuation in + DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { + continuation.resume() + } + } + do { + try await sem.waitCheckingCancellation() + XCTFail("Expected CancellationError") + } catch is CancellationError { + } catch { + XCTFail("Unexpected error") + } + exp.fulfill() + } + task.cancel() + wait(for: [exp], timeout: 5) + } + + func testCancellationWhileSuspendedIncrementsSemaphore() async throws { + // Given a task cancelled while suspended on a semaphore, + let sem = AsyncSemaphore(value: 0) + let task = Task { + try await sem.waitCheckingCancellation() + } + try await Task.sleep(for: .milliseconds(100)) + task.cancel() + + // When a task waits for this semaphore, + let ex1 = expectation(description: "wait") + ex1.isInverted = true + let ex2 = expectation(description: "woken") + Task { + await sem.wait() + ex1.fulfill() + ex2.fulfill() + } + + // Then the task is initially suspended. + await fulfillment(of: [ex1], timeout: 0.5) + + // When a signal occurs, then the suspended task is resumed. + sem.signal() + await fulfillment(of: [ex2], timeout: 0.5) + } + + func testCancellationBeforeSuspensionIncrementsSemaphore() throws { + // Given a task cancelled before it waits on a semaphore, + let sem = AsyncSemaphore(value: 0) + let task = Task { + // Uncancellable delay + await withUnsafeContinuation { continuation in + DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { + continuation.resume() + } + } + try await sem.waitCheckingCancellation() + } + task.cancel() + + // When a task waits for this semaphore, + let ex1 = expectation(description: "wait") + ex1.isInverted = true + let ex2 = expectation(description: "woken") + Task { + await sem.wait() + ex1.fulfill() + ex2.fulfill() + } + + // Then the task is initially suspended. + wait(for: [ex1], timeout: 0.5) + + // When a signal occurs, then the suspended task is resumed. + sem.signal() + wait(for: [ex2], timeout: 0.5) + } + + func testSemaphoreAsAResourceLimiterOnActorMethod() async { + /// An actor that limits the number of concurrent executions of + /// its `run()` method, and counts the effective number of + /// concurrent executions for testing purpose. + actor Runner { + private let semaphore: AsyncSemaphore + private var count = 0 + private(set) var effectiveMaxConcurrentRuns = 0 + + init(maxConcurrentRuns: Int) { + semaphore = AsyncSemaphore(value: maxConcurrentRuns) + } + + func run() async { + await semaphore.wait() + defer { semaphore.signal() } + + count += 1 + effectiveMaxConcurrentRuns = max(effectiveMaxConcurrentRuns, count) + try? await Task.sleep(for: .milliseconds(100)) + count -= 1 + } + } + + for maxConcurrentRuns in 1...10 { + let runner = Runner(maxConcurrentRuns: maxConcurrentRuns) + + // Spawn many concurrent tasks + await withTaskGroup(of: Void.self) { group in + for _ in 0..<20 { + group.addTask { + await runner.run() + } + } + } + + let effectiveMaxConcurrentRuns = await runner.effectiveMaxConcurrentRuns + XCTAssertEqual(effectiveMaxConcurrentRuns, maxConcurrentRuns) + } + } + + func testSemaphoreAsAResourceLimiterOnAsyncMethod() async { + /// A class that limits the number of concurrent executions of + /// its `run()` method, and counts the effective number of + /// concurrent executions for testing purpose. + @MainActor + class Runner { + private let semaphore: AsyncSemaphore + private var count = 0 + private(set) var effectiveMaxConcurrentRuns = 0 + + init(maxConcurrentRuns: Int) { + semaphore = AsyncSemaphore(value: maxConcurrentRuns) + } + + func run() async { + await semaphore.wait() + defer { semaphore.signal() } + + count += 1 + effectiveMaxConcurrentRuns = max(effectiveMaxConcurrentRuns, count) + try? await Task.sleep(for: .milliseconds(100)) + count -= 1 + } + } + + for maxConcurrentRuns in 1...10 { + let runner = await Runner(maxConcurrentRuns: maxConcurrentRuns) + + // Spawn many concurrent tasks + await withTaskGroup(of: Void.self) { group in + for _ in 0..<20 { + group.addTask { + await runner.run() + } + } + } + + let effectiveMaxConcurrentRuns = await runner.effectiveMaxConcurrentRuns + XCTAssertEqual(effectiveMaxConcurrentRuns, maxConcurrentRuns) + } + } + + func testSemaphoreAsAResourceLimiterOnSingleThread() async { + /// A class that limits the number of concurrent executions of + /// its `run()` method, and counts the effective number of + /// concurrent executions for testing purpose. + @MainActor + class Runner { + private let semaphore: AsyncSemaphore + private var count = 0 + private(set) var effectiveMaxConcurrentRuns = 0 + + init(maxConcurrentRuns: Int) { + semaphore = AsyncSemaphore(value: maxConcurrentRuns) + } + + func run() async { + await semaphore.wait() + defer { semaphore.signal() } + + count += 1 + effectiveMaxConcurrentRuns = max(effectiveMaxConcurrentRuns, count) + try? await Task.sleep(for: .milliseconds(100)) + count -= 1 + } + } + + await Task { @MainActor in + let runner = Runner(maxConcurrentRuns: 3) + async let run0: Void = runner.run() + async let run1: Void = runner.run() + async let run2: Void = runner.run() + async let run3: Void = runner.run() + async let run4: Void = runner.run() + async let run5: Void = runner.run() + async let run6: Void = runner.run() + async let run7: Void = runner.run() + async let run8: Void = runner.run() + async let run9: Void = runner.run() + _ = await (run0, run1, run2, run3, run4, run5, run6, run7, run8, run9) + let effectiveMaxConcurrentRuns = runner.effectiveMaxConcurrentRuns + XCTAssertEqual(effectiveMaxConcurrentRuns, 3) + }.value + } + + func testSemaphoreAsAResourceLimiterOnActorMethodWithCancellationSupport() async { + /// An actor that limits the number of concurrent executions of + /// its `run()` method, and counts the effective number of + /// concurrent executions for testing purpose. + actor Runner { + private let semaphore: AsyncSemaphore + private var count = 0 + private(set) var effectiveMaxConcurrentRuns = 0 + + init(maxConcurrentRuns: Int) { + semaphore = AsyncSemaphore(value: maxConcurrentRuns) + } + + func run() async throws { + try await semaphore.waitCheckingCancellation() + defer { semaphore.signal() } + + count += 1 + effectiveMaxConcurrentRuns = max(effectiveMaxConcurrentRuns, count) + try await Task.sleep(for: .milliseconds(100)) + count -= 1 + } + } + + for maxConcurrentRuns in 1...10 { + let runner = Runner(maxConcurrentRuns: maxConcurrentRuns) + + // Spawn many concurrent tasks + await withThrowingTaskGroup(of: Void.self) { group in + for _ in 0..<20 { + group.addTask { + try await runner.run() + } + } + } + + let effectiveMaxConcurrentRuns = await runner.effectiveMaxConcurrentRuns + XCTAssertEqual(effectiveMaxConcurrentRuns, maxConcurrentRuns) + } + } +}