Skip to content

Commit

Permalink
Introduce RWLock, ManagedAsynchronousAccess and DataDescriptor (#17)
Browse files Browse the repository at this point in the history
# Introduce RWLock, ManagedAsynchronousAccess and DataDescriptor

## ♻️ Current situation & Problem
This PR adds the `RWLock`, `RecursiveRWLock` and
`ManagedAsynchronousAccess` infrastructure that was previously
introduced in SpeziBluetooth (see
StanfordSpezi/SpeziBluetooth#45 and
StanfordSpezi/SpeziBluetooth#47).
This changes require the Swift 6 toolchain. Therefore, we increase the
required swift tools version to 6.0.

This PR introduces the final changes for the SpeziFoundation 2.0 release
(assuming #16 is merged beforehand).

## ⚙️ Release Notes 
* Added `RWLock` and `RecursiveRWLock`
* Added `ManagedAsynchronousAccess`
* Only require `sending` closure with `withTimeout` instead of a
`@Sendable` one.
* Add new `DataDescriptor` type.


## 📚 Documentation
Updated the documentation catalog, adding a new "Concurrency" topics
section.


## ✅ Testing
Added unit test for the new components.


## 📝 Code of Conduct & Contributing Guidelines 

By submitting creating this pull request, you agree to follow our [Code
of
Conduct](https://github.com/StanfordSpezi/.github/blob/main/CODE_OF_CONDUCT.md)
and [Contributing
Guidelines](https://github.com/StanfordSpezi/.github/blob/main/CONTRIBUTING.md):
- [x] I agree to follow the [Code of
Conduct](https://github.com/StanfordSpezi/.github/blob/main/CODE_OF_CONDUCT.md)
and [Contributing
Guidelines](https://github.com/StanfordSpezi/.github/blob/main/CONTRIBUTING.md).
  • Loading branch information
Supereg authored Oct 17, 2024
1 parent 2506285 commit 5b4ad1b
Show file tree
Hide file tree
Showing 14 changed files with 1,233 additions and 78 deletions.
20 changes: 0 additions & 20 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,6 @@ jobs:
scheme: SpeziFoundation
artifactname: SpeziFoundation.xcresult
resultBundle: SpeziFoundation.xcresult
packageios_latest:
name: Build and Test Swift Package iOS Latest
uses: StanfordSpezi/.github/.github/workflows/xcodebuild-or-fastlane.yml@v2
with:
runsonlabels: '["macOS", "self-hosted"]'
scheme: SpeziFoundation
xcodeversion: latest
swiftVersion: 6
artifactname: SpeziFoundation-Latest.xcresult
resultBundle: SpeziFoundation-Latest.xcresult
packagewatchos:
name: Build and Test Swift Package watchOS
uses: StanfordSpezi/.github/.github/workflows/xcodebuild-or-fastlane.yml@v2
Expand Down Expand Up @@ -73,16 +63,6 @@ jobs:
resultBundle: SpeziFoundationMacOS.xcresult
destination: 'platform=macOS,arch=arm64'
artifactname: SpeziFoundationMacOS.xcresult
codeql:
name: CodeQL
uses: StanfordSpezi/.github/.github/workflows/xcodebuild-or-fastlane.yml@v2
with:
codeql: true
test: false
scheme: SpeziFoundation
permissions:
security-events: write
actions: read
uploadcoveragereport:
name: Upload Coverage Report
needs: [packageios, packagewatchos, packagevisionos, packagetvos, packagemacos]
Expand Down
22 changes: 7 additions & 15 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.9
// swift-tools-version:6.0

//
// This source file is part of the Stanford Spezi open-source project
Expand All @@ -12,13 +12,6 @@ import class Foundation.ProcessInfo
import PackageDescription


#if swift(<6)
let swiftConcurrency: SwiftSetting = .enableExperimentalFeature("StrictConcurrency")
#else
let swiftConcurrency: SwiftSetting = .enableUpcomingFeature("StrictConcurrency")
#endif


let package = Package(
name: "SpeziFoundation",
defaultLocalization: "en",
Expand All @@ -32,26 +25,25 @@ let package = Package(
products: [
.library(name: "SpeziFoundation", targets: ["SpeziFoundation"])
],
dependencies: swiftLintPackage(),
dependencies: [
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0")
] + swiftLintPackage(),
targets: [
.target(
name: "SpeziFoundation",
dependencies: [
.product(name: "Atomics", package: "swift-atomics")
],
resources: [
.process("Resources")
],
swiftSettings: [
swiftConcurrency
],
plugins: [] + swiftLintPlugin()
),
.testTarget(
name: "SpeziFoundationTests",
dependencies: [
.target(name: "SpeziFoundation")
],
swiftSettings: [
swiftConcurrency
],
plugins: [] + swiftLintPlugin()
)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,16 @@ public final class AsyncSemaphore: Sendable {
/// This method allows the `Task` calling ``waitCheckingCancellation()`` to be cancelled while waiting, throwing a `CancellationError` if the `Task` is cancelled before it can proceed.
///
/// - Throws: `CancellationError` if the task is cancelled while waiting.
public func waitCheckingCancellation() async throws {
try Task.checkCancellation() // check if we are already cancelled
public func waitCheckingCancellation() async throws(CancellationError) {
if Task.isCancelled { // check if we are already cancelled
throw CancellationError()
}

unsafeLock() // this is okay, as the continuation body actually runs sync, so we do no have async code within critical region

do {
// check if we got cancelled while acquiring the lock
try Task.checkCancellation()
} catch {
if Task.isCancelled { // check if we got cancelled while acquiring the lock
unsafeUnlock()
throw error
throw CancellationError()
}

value -= 1 // decrease the value
Expand All @@ -130,37 +129,42 @@ public final class AsyncSemaphore: Sendable {

let id = UUID()

try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation<Void, Error>) in
if Task.isCancelled {
value += 1 // restore the value
unsafeUnlock()

continuation.resume(throwing: CancellationError())
} else {
suspendedTasks.append(SuspendedTask(id: id, suspension: .cancelable(continuation)))
unsafeUnlock()
do {
try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation<Void, Error>) in
if Task.isCancelled {
value += 1 // restore the value
unsafeUnlock()

continuation.resume(throwing: CancellationError())
} else {
suspendedTasks.append(SuspendedTask(id: id, suspension: .cancelable(continuation)))
unsafeUnlock()
}
}
}
} onCancel: {
let task = nsLock.withLock {
value += 1
} onCancel: {
let task = nsLock.withLock {
value += 1

guard let index = suspendedTasks.firstIndex(where: { $0.id == id }) else {
preconditionFailure("Inconsistent internal state reached")
guard let index = suspendedTasks.firstIndex(where: { $0.id == id }) else {
preconditionFailure("Inconsistent internal state reached")
}

let task = suspendedTasks[index]
suspendedTasks.remove(at: index)
return task
}

let task = suspendedTasks[index]
suspendedTasks.remove(at: index)
return task
}

switch task.suspension {
case .regular:
preconditionFailure("Tried to cancel a task that was not cancellable!")
case let .cancelable(continuation):
continuation.resume(throwing: CancellationError())
switch task.suspension {
case .regular:
preconditionFailure("Tried to cancel a task that was not cancellable!")
case let .cancelable(continuation):
continuation.resume(throwing: CancellationError())
}
}
} catch {
assert(error is CancellationError, "Injected unexpected error into continuation: \(error)")
throw CancellationError()
}
}

Expand Down
186 changes: 186 additions & 0 deletions Sources/SpeziFoundation/Concurrency/ManagedAsynchronousAccess.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//
// This source file is part of the Stanford Spezi open-source project
//
// SPDX-FileCopyrightText: 2024 Stanford University and the project authors (see CONTRIBUTORS.md)
//
// SPDX-License-Identifier: MIT
//


/// A continuation with exclusive access.
///
///
public final class ManagedAsynchronousAccess<Value, E: Error> {
private final class CallSiteState {
var wasCancelled = false

init() {}
}

private let access: AsyncSemaphore
private var continuation: CheckedContinuation<Value, E>?
private var associatedState: CallSiteState?

/// Determine if the is currently an ongoing access.
public var ongoingAccess: Bool {
continuation != nil
}

/// Create a new managed asynchronous access.
public init() {
self.access = AsyncSemaphore(value: 1)
}

private func markCancelled() {
if let associatedState {
associatedState.wasCancelled = true
self.associatedState = nil
}
}

/// Resume the continuation by either returning a value or throwing an error.
/// - Parameter result: The result to return from the continuation.
/// - Returns: Returns `true`, if there was another task waiting to access the continuation and it was resumed.
@discardableResult
public func resume(with result: sending Result<Value, E>) -> Bool {
self.associatedState = nil
if let continuation {
self.continuation = nil
let didSignalAnyone = access.signal()
continuation.resume(with: result)
return didSignalAnyone
}

return false
}

/// Resume the continuation by returning a value.
/// - Parameter value: The value to return from the continuation.
/// - Returns: Returns `true`, if there was another task waiting to access the continuation and it was resumed.
@discardableResult
public func resume(returning value: sending Value) -> Bool {
resume(with: .success(value))
}

/// Resume the continuation by throwing an error.
/// - Parameter error: The error that is thrown from the continuation.
/// - Returns: Returns `true`, if there was another task waiting to access the continuation and it was resumed.
@discardableResult
public func resume(throwing error: E) -> Bool {
resume(with: .failure(error))
}
}


extension ManagedAsynchronousAccess where Value == Void {
/// Resume the continuation.
/// - Returns: Returns `true`, if there was another task waiting to access the continuation and it was resumed.
@discardableResult
public func resume() -> Bool {
self.resume(returning: ())
}
}


extension ManagedAsynchronousAccess where E == Error {
/// Perform an managed, asynchronous access.
///
/// Call this method to perform an managed, asynchronous access. This method awaits exclusive access, creates a continuation and
/// calls the provided closure and then suspends until ``resume(with:)`` is called.
///
/// - Parameters:
/// - isolation: Inherits actor isolation from the call site.
/// - action: The action that is executed inside the continuation closure that triggers an asynchronous operation.
/// - Returns: The value from the continuation.
public func perform(
isolation: isolated (any Actor)? = #isolation,
action: () -> Void
) async throws -> Value {
try await access.waitCheckingCancellation()

let state = CallSiteState()

defer {
if state.wasCancelled {
withUnsafeCurrentTask { task in
task?.cancel()
}
}
}

return try await withCheckedThrowingContinuation { continuation in
assert(self.continuation == nil, "continuation was unexpectedly not nil")
self.continuation = continuation
assert(self.associatedState == nil, "associatedState was unexpectedly not nil")
self.associatedState = state
action()
}
}

/// Cancel all ongoing accesses.
///
/// Calling this methods will cancel all tasks that currently await exclusive access and will resume the continuation by throwing a
/// cancellation error.
/// - Parameter error: A custom error that is thrown instead of the cancellation error.
public func cancelAll(error: E? = nil) {
markCancelled()
if let continuation {
self.continuation = nil
continuation.resume(throwing: error ?? CancellationError())
}
access.cancelAll()
}
}


extension ManagedAsynchronousAccess where E == Never {
/// Perform an managed, asynchronous access.
///
/// Call this method to perform an managed, asynchronous access. This method awaits exclusive access, creates a continuation and
/// calls the provided closure and then suspends until ``resume(with:)`` is called.
///
/// - Parameters:
/// - isolation: Inherits actor isolation from the call site.
/// - action: The action that is executed inside the continuation closure that triggers an asynchronous operation.
public func perform(
isolation: isolated (any Actor)? = #isolation,
action: () -> Void
) async throws(CancellationError) -> Value {
try await access.waitCheckingCancellation()

let state = CallSiteState()

let value = await withCheckedContinuation { continuation in
assert(self.continuation == nil, "continuation was unexpectedly not nil")
self.continuation = continuation
assert(self.associatedState == nil, "associatedState was unexpectedly not nil")
self.associatedState = state
action()
}

if state.wasCancelled {
withUnsafeCurrentTask { task in
task?.cancel()
}
throw CancellationError()
}

return value
}
}


extension ManagedAsynchronousAccess where Value == Void, E == Never {
/// Cancel all ongoing accesses.
///
/// Calling this methods will cancel all tasks that currently await exclusive access.
/// The continuation will be resumed. Make sure to propagate cancellation information yourself.
public func cancelAll() {
markCancelled()
if let continuation {
self.continuation = nil
continuation.resume()
}
access.cancelAll()
}
}
Loading

0 comments on commit 5b4ad1b

Please sign in to comment.