diff --git a/packages/riverpod/CHANGELOG.md b/packages/riverpod/CHANGELOG.md index 5ded22860..1452ccaab 100644 --- a/packages/riverpod/CHANGELOG.md +++ b/packages/riverpod/CHANGELOG.md @@ -24,6 +24,7 @@ - **Breaking**: A provider is now considered "paused" if all of its listeners are also paused. So if a provider `A` is watched _only_ by a provider `B`, and `B` is currently unused, then `A` will be paused. +- Fix `StreamProvider` not cancelling the `StreamSubscription` if the stream is never emitted any value. - All `Ref` life-cycles (such as `Ref.onDispose`) and `Notifier.listenSelf` now return a function to remove the listener. - Added methods to `ProviderObserver` for listening to "mutations". diff --git a/packages/riverpod/lib/src/core/modifiers/future.dart b/packages/riverpod/lib/src/core/modifiers/future.dart index 757518867..1391ce31f 100644 --- a/packages/riverpod/lib/src/core/modifiers/future.dart +++ b/packages/riverpod/lib/src/core/modifiers/future.dart @@ -3,9 +3,13 @@ part of '../../framework.dart'; /// Internal typedef for cancelling the subscription to an async operation @internal typedef AsyncSubscription = ({ + /// The provider was disposed, but may rebuild later void Function() cancel, void Function()? pause, void Function()? resume, + + /// The provider was disposed + void Function()? abort, }); /// Implementation detail of `riverpod_generator`. @@ -422,10 +426,13 @@ mixin FutureModifierElement on ProviderElement> { }, ); - last(futureOr, cancel); + last(futureOr); return ( cancel: cancel, + // We don't call `cancel` here to let `provider.future` resolve with + // the last value emitted by the future. + abort: null, pause: null, resume: null, ); @@ -438,7 +445,7 @@ mixin FutureModifierElement on ProviderElement> { required void Function(StateT) data, required void Function(Object, StackTrace) error, required void Function() done, - required void Function(Future, void Function()) last, + required void Function(Future) last, }) listen, { required bool seamless, }) { @@ -450,12 +457,12 @@ mixin FutureModifierElement on ProviderElement> { } try { - final sub = _cancelSubscription = listen( + _cancelSubscription = listen( data: (value) { onData(AsyncData(value), seamless: seamless); }, error: callOnError, - last: (last, sub) { + last: (last) { assert(_lastFuture == null, 'bad state'); _lastFuture = last; }, @@ -463,10 +470,6 @@ mixin FutureModifierElement on ProviderElement> { _lastFuture = null; }, ); - assert( - sub == null || _lastFuture != null, - 'An async operation is pending but the state for provider.future was not initialized.', - ); } catch (error, stackTrace) { callOnError(error, stackTrace); } @@ -492,17 +495,7 @@ mixin FutureModifierElement on ProviderElement> { final lastFuture = _lastFuture; if (lastFuture != null) { - final cancelSubscription = _cancelSubscription; - if (cancelSubscription != null) { - cancelSubscription.resume?.call(); - lastFuture - .then( - (_) {}, - // ignore: avoid_types_on_closure_parameters - onError: (Object _) {}, - ) - .whenComplete(cancelSubscription.cancel); - } + _cancelSubscription?.abort?.call(); // Prevent super.dispose from cancelling the subscription on the "last" // stream value, so that it can be sent to `provider.future`. @@ -535,60 +528,22 @@ mixin FutureModifierElement on ProviderElement> { extension on Stream { AsyncSubscription listenAndTrackLast( - void Function(Future, void Function()) last, { + void Function(Future) last, { required Object Function() lastOrElseError, required void Function(T event) onData, required void Function(Object error, StackTrace stackTrace) onError, required void Function() onDone, }) { - final completer = Completer(); - - Result? result; late StreamSubscription subscription; - subscription = listen( - (event) { - result = Result.data(event); - onData(event); - }, - // ignore: avoid_types_on_closure_parameters - onError: (Object error, StackTrace stackTrace) { - result = Result.error(error, stackTrace); - onError(error, stackTrace); - }, - onDone: () { - if (result != null) { - switch (result!) { - case ResultData(:final state): - completer.complete(state); - case ResultError(:final error, :final stackTrace): - completer.future.ignore(); - completer.completeError(error, stackTrace); - } - } else { - // The error happens after the associated provider is disposed. - // As such, it's normally never read. Reporting this error as uncaught - // would cause too many false-positives. And the edge-cases that - // do reach this error will throw anyway - completer.future.ignore(); - - completer.completeError( - lastOrElseError(), - StackTrace.current, - ); - } - - onDone(); - }, - ); + subscription = listen(onData, onError: onError, onDone: onDone); final asyncSub = ( cancel: subscription.cancel, pause: subscription.pause, - resume: subscription.resume + resume: subscription.resume, + abort: subscription.cancel, ); - last(completer.future, asyncSub.cancel); - return asyncSub; } } diff --git a/packages/riverpod/test/src/providers/stream_notifier_test.dart b/packages/riverpod/test/src/providers/stream_notifier_test.dart index d73d4524b..7ee32bebe 100644 --- a/packages/riverpod/test/src/providers/stream_notifier_test.dart +++ b/packages/riverpod/test/src/providers/stream_notifier_test.dart @@ -33,7 +33,6 @@ void main() { test('closes the StreamSubscription upon disposing the provider', () async { final onCancel = OnCancelMock(); final container = ProviderContainer.test(); - final cancelCompleter = Completer.sync(); final provider = factory.simpleTestProvider((ref, _) { final controller = StreamController(); ref.onDispose(() { @@ -43,10 +42,7 @@ void main() { return DelegatingStream( controller.stream, - onSubscriptionCancel: () { - onCancel.call(); - cancelCompleter.complete(); - }, + onSubscriptionCancel: onCancel.call, ); }); @@ -55,12 +51,9 @@ void main() { container.dispose(); - verifyZeroInteractions(onCancel); - - await expectLater(future, throwsA(42)); - await cancelCompleter.future; - verifyOnly(onCancel, onCancel()); + + await expectLater(future, throwsStateError); }); test('Pauses the Stream when the provider is paused', () { @@ -690,60 +683,37 @@ void main() { () async { final container = ProviderContainer.test(); final completer = Completer.sync(); + addTearDown(() => completer.complete(42)); final provider = factory.simpleTestProvider( (ref, _) => Stream.fromFuture(completer.future), ); final future = container.read(provider.future); - container.dispose(); - - completer.complete(42); + expect(future, throwsA(isStateError)); - await expectLater(future, completion(42)); + container.dispose(); }); test( - 'when disposed during loading, resolves with the error of StreamNotifier.build', - () async { + 'going data > loading while the future is still pending. ' + 'Resolves with error', () async { final container = ProviderContainer.test(); final completer = Completer.sync(); + addTearDown(() => completer.complete(42)); final provider = factory.simpleTestProvider( (ref, _) => Stream.fromFuture(completer.future), ); + container.read(provider); + container.read(provider.notifier).state = const AsyncData(42); + container.read(provider.notifier).state = const AsyncLoading(); + final future = container.read(provider.future); + expect(future, throwsA(isStateError)); container.dispose(); - - completer.completeError(42); - - await expectLater(future, throwsA(42)); }); - test( - 'going data > loading while the future is still pending. ' - 'Resolves with last future result', - () async { - final container = ProviderContainer.test(); - final completer = Completer.sync(); - final provider = factory.simpleTestProvider( - (ref, _) => Stream.fromFuture(completer.future), - ); - - container.read(provider); - container.read(provider.notifier).state = const AsyncData(42); - container.read(provider.notifier).state = const AsyncLoading(); - - final future = container.read(provider.future); - - container.dispose(); - - completer.complete(42); - - await expectLater(future, completion(42)); - }, - ); - test('if going back to loading after future resolved, throws StateError', () async { final container = ProviderContainer.test(); diff --git a/packages/riverpod_generator/test/stream_test.dart b/packages/riverpod_generator/test/stream_test.dart index 729ccc569..7a39f0eb9 100644 --- a/packages/riverpod_generator/test/stream_test.dart +++ b/packages/riverpod_generator/test/stream_test.dart @@ -34,7 +34,7 @@ void main() { ], ); - final result = container.read(publicProvider.future); + final result = container.listen(publicProvider.future, (a, b) {}).read(); expect(await result, 'Hello world'); }); @@ -54,11 +54,15 @@ void main() { ); expect( - await container.read(familyProvider(42, third: .42).future), + await container + .listen(familyProvider(42, third: .42).future, (a, b) {}) + .read(), 'Hello world 42 null 0.42 true null', ); expect( - await container.read(familyProvider(21, third: .21).future), + await container + .listen(familyProvider(21, third: .21).future, (a, b) {}) + .read(), 'Override', ); });