You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We have identified a commonly encountered problem in RxSwift where event forwarding is performed under a lock. It leads to deadlocks and unexpected locks on threads, causing issues in code execution. We created a sample that demonstrates two specific problems withtestDeadlock and testTemporaryLock methods within the RxTester class.
Expected outcome:
We assume that event forwarding should be performed outside of the locked sections. However, if there are cases that require forwarding under a lock to guarantee that only one event is forwarded simultaneously, it should use a separate lock.
What actually happens:
The issue of event forwarding under a lock has widespread implications throughout the RxSwift codebase.
Problem 1: testDeadlock
This example illustrates the deadlock caused by a specific combination of Rx operators. The major root causes:
The share(replay: 1) operator performs event forwarding under its lock for a non-first subscriber if there is an event to replay.
The combineLatest operator uses the SynchronizedOnType protocol and performs event forwarding under a lock.
Problem 2: testTemporaryLock
The testTemporaryLock example showcases how a lightweight event generation (e.g., on the main thread) can unexpectedly wait on a lock due to the heavyweight processing of events in a background thread. It happens because some operators (e.g., ‘debounce’) may receive and forward events in different threads but do that under the same lock.
Presumably, similar problems might reproduce with other operators. For example, all classes using ‘SynchronizedOnType’ protocol automatically follow the pattern; some classes not using it might have similar logic (like ShareReplay1WhileConnectedConnection). At the same time, many pieces of RxSwift code perform event forwarding after locked sections, and it looks right.
Self contained code example that reproduces the issue:
import Foundation
import RxSwift
classRxTester{privateletbag=DisposeBag()
// Some observable that starts with an event and doesn't complete instantly.
privatefunc makeSomeObservable<T>(firstValue:T)->Observable<T>{
Observable<T>.create({ observer in
observer.onNext(firstValue)returnDisposables.create()})}
/*
This test demonstrates a deadlock, which is possible because:
* share(replay: 1) performs event forwarding for non-first subscribers under its lock (see ShareReplay1WhileConnectedConnection.synchronized_subscribe(_:))
* combineLatest, using SynchronizedOnType protocol, performs event forwarding under its lock
Here's the scheme of the locked state of the threads:
BG: -sub(1st)-> sharedSequence2 -sub(2nd)-> sharedSequence1 -on[locked]-> sharedSequence2 -on-> combLatest [waiting on lock]**
Current: -sub-> complexSequence -sub-> flatMapLatest -sub-> combLatest -on[locked]*-> flatMapLatest's SwitchSink -sub-> sharedSequence1 [waiting on lock]
* When combLatest is subscribed in the current thread, its 1st component produces a 'stWithValue' and tries to subscribe to sharedSequence2. But since it's not the 1st subscriber, then combLatest (it's 1st component) is only added to sharedSequence2's observers without immediate event (which is not ready yet, because sharedSequence2 is still performing its 'connect' at the moment). So, combLatest subscribes to its 2nd component, receives 'justValue' and starts forwarding its pair of values to flatMapLatest.
** Later, when BG thread completes its slow start of sharedSequence2, it forwards its 1st event to all observers, including the previously subscribed combLatest.
*/
func testDeadlock(){letsharedSequence1=makeSomeObservable(firstValue:"obsValue").share(replay:1)
// We perform a preliminary 1st subscription to sharedSequence1, so that next subscriptions to it will trigger event forwarding under its lock.
sharedSequence1.subscribe{ _ inNSLog("First subscription to sharedSequence1 finished in current thread (got an event)")}.disposed(by: bag)letsharedSequence2= sharedSequence1.do(onNext:{ _ in
// This sleep makes the 1st subscription to sharedSequence2 slow to provide a stable reproducibility.
Thread.sleep(forTimeInterval:0.5)}).share(replay:1)DispatchQueue.global().async{
sharedSequence2.subscribe{ _ inNSLog("First subscription to sharedSequence2 finished in another thread (got an event)")}.disposed(by:self.bag)}
// Sleep to ensure that 1st subscription to sharedSequence2 is in process in another thread.
Thread.sleep(forTimeInterval:0.2)
// startWith allows combineLatest to produce a value even though sharedSequence2 is not ready yet.
letcombLatest=Observable.combineLatest(sharedSequence2.startWith("stWithValue"),Observable.just("justValue"))
// combLatest maps into another sequence depending on the sharedSequence1 (for simplicity it's just sharedSequence1 here).
letcomplexSequence= combLatest.flatMapLatest{ _ inreturn sharedSequence1
}
complexSequence.subscribe{ _ inNSLog("complexSequence produced an event")}.disposed(by: bag)NSLog("testDeadlock has finished (shouldn't happen)")}privatestaticvarpreviousTimerEventDate:Date!
/*
This test demonstrates unexpected temporary locks on the main thread, which happens because:
* debounce operator receives and forwards events under the same lock
*/
func testTemporaryLock(){Self.previousTimerEventDate =.init()letmainThreadEventsGenerator=Observable<Int>.timer(.zero,
period:.seconds(1),
scheduler:MainScheduler.instance).do(onNext:{ event inlettimeSince=Date().timeIntervalSince(Self.previousTimerEventDate)NSLog("Timer produced new event: \(event), time since previous event: \(timeSince)")Self.previousTimerEventDate =.init()})letscheduler=SerialDispatchQueueScheduler(qos:.default)letbackgroundProcessingSubscription= mainThreadEventsGenerator.debounce(.milliseconds(0), scheduler: scheduler).subscribe{ _ inNSLog("Processing next debounced event in thread \(Thread.current)...")
// Imitation of a heavy task.
Thread.sleep(forTimeInterval:5.0)NSLog("Processed")}DispatchQueue.global().asyncAfter(deadline:.now()+30.0){NSLog("Will dispose subscription \(backgroundProcessingSubscription)")
backgroundProcessingSubscription.dispose()}}}
Reproduction Steps:
To reproduce the issue, follow these steps:
Create an instance of RxTester
Call the desired method, such as testTemporaryLock()
let tester = RxTester()
tester.testTemporaryLock()
RxSwift/RxCocoa/RxBlocking/RxTest version/commit
version or commit here
Platform/Environment
iOS
macOS
tvOS
watchOS
playgrounds
How easy is to reproduce? (chances of successful reproduce after running the self contained code)
easy, 100% repro
sometimes, 10%-100%
hard, 2% - 10%
extremely hard, %0 - 2%
Xcode version:
Xcode 14.1
⚠️ Fields below are optional for general issues or in case those questions aren't related to your issue, but filling them out will increase the chances of getting your issue resolved. ⚠️
Installation method:
CocoaPods
Carthage
Git submodules
I have multiple versions of Xcode installed: (so we can know if this is a potential cause of your issue)
yes (which ones)
no
Level of RxSwift knowledge: (this is so we can understand your level of knowledge and formulate the response in an appropriate manner)
just starting
I have a small code base
I have a significant code base
The text was updated successfully, but these errors were encountered:
Short description of the issue:
We have identified a commonly encountered problem in RxSwift where event forwarding is performed under a lock. It leads to deadlocks and unexpected locks on threads, causing issues in code execution. We created a sample that demonstrates two specific problems with
testDeadlock
andtestTemporaryLock
methods within theRxTester
class.Expected outcome:
We assume that event forwarding should be performed outside of the locked sections. However, if there are cases that require forwarding under a lock to guarantee that only one event is forwarded simultaneously, it should use a separate lock.
What actually happens:
The issue of event forwarding under a lock has widespread implications throughout the RxSwift codebase.
Problem 1: testDeadlock
This example illustrates the deadlock caused by a specific combination of Rx operators. The major root causes:
share(replay: 1)
operator performs event forwarding under its lock for a non-first subscriber if there is an event to replay.combineLatest
operator uses theSynchronizedOnType
protocol and performs event forwarding under a lock.Problem 2: testTemporaryLock
The testTemporaryLock example showcases how a lightweight event generation (e.g., on the main thread) can unexpectedly wait on a lock due to the heavyweight processing of events in a background thread. It happens because some operators (e.g., ‘debounce’) may receive and forward events in different threads but do that under the same lock.
Presumably, similar problems might reproduce with other operators. For example, all classes using ‘SynchronizedOnType’ protocol automatically follow the pattern; some classes not using it might have similar logic (like ShareReplay1WhileConnectedConnection). At the same time, many pieces of RxSwift code perform event forwarding after locked sections, and it looks right.
Self contained code example that reproduces the issue:
Reproduction Steps:
To reproduce the issue, follow these steps:
RxTester
testTemporaryLock()
RxSwift/RxCocoa/RxBlocking/RxTest version/commit
version or commit here
Platform/Environment
How easy is to reproduce? (chances of successful reproduce after running the self contained code)
Xcode version:
Installation method:
I have multiple versions of Xcode installed: (so we can know if this is a potential cause of your issue)
Level of RxSwift knowledge: (this is so we can understand your level of knowledge and formulate the response in an appropriate manner)
The text was updated successfully, but these errors were encountered: