Skip to content

Commit

Permalink
Fix-stream-cancel (#3865)
Browse files Browse the repository at this point in the history
## Related Issues

fixes #3853

<!--
  Update to link the issue that is going to be fixed by this.
  Unless this concerns documentation, make sure to create an issue first
  before raising a PR.

  You do not need to describe what this PR is doing, as this should
  already be covered by the associated issue.
  If the linked issue isn't enough, then chances are a new issue
  is needed.

  Don't hesitate to create many issues! This can avoid working
  on something, only to have your PR closed or have to be rewritten
  due to a disagreement/misunderstanding.
 -->

## Checklist

Before you create this PR confirm that it meets all requirements listed
below by checking the relevant checkboxes (`[x]`).

- [x] I have updated the `CHANGELOG.md` of the relevant packages.
      Changelog files must be edited under the form:

  ```md
  ## Unreleased fix/major/minor

  - Description of your change. (thanks to @yourgithubid)
  ```

- [x] If this contains new features or behavior changes,
      I have updated the documentation to match those changes.
  • Loading branch information
rrousselGit authored Dec 9, 2024
1 parent d7b791c commit cc80883
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 108 deletions.
1 change: 1 addition & 0 deletions packages/riverpod/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- **Breaking**: A provider is now considered "paused" if all
of its listeners are also paused. So if a provider `A` is watched _only_ by a provider `B`, and `B` is currently unused,
then `A` will be paused.
- Fix `StreamProvider` not cancelling the `StreamSubscription` if the stream is never emitted any value.
- All `Ref` life-cycles (such as `Ref.onDispose`) and `Notifier.listenSelf`
now return a function to remove the listener.
- Added methods to `ProviderObserver` for listening to "mutations".
Expand Down
77 changes: 16 additions & 61 deletions packages/riverpod/lib/src/core/modifiers/future.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ part of '../../framework.dart';
/// Internal typedef for cancelling the subscription to an async operation
@internal
typedef AsyncSubscription = ({
/// The provider was disposed, but may rebuild later
void Function() cancel,
void Function()? pause,
void Function()? resume,

/// The provider was disposed
void Function()? abort,
});

/// Implementation detail of `riverpod_generator`.
Expand Down Expand Up @@ -422,10 +426,13 @@ mixin FutureModifierElement<StateT> on ProviderElement<AsyncValue<StateT>> {
},
);

last(futureOr, cancel);
last(futureOr);

return (
cancel: cancel,
// We don't call `cancel` here to let `provider.future` resolve with
// the last value emitted by the future.
abort: null,
pause: null,
resume: null,
);
Expand All @@ -438,7 +445,7 @@ mixin FutureModifierElement<StateT> on ProviderElement<AsyncValue<StateT>> {
required void Function(StateT) data,
required void Function(Object, StackTrace) error,
required void Function() done,
required void Function(Future<StateT>, void Function()) last,
required void Function(Future<StateT>) last,
}) listen, {
required bool seamless,
}) {
Expand All @@ -450,23 +457,19 @@ mixin FutureModifierElement<StateT> on ProviderElement<AsyncValue<StateT>> {
}

try {
final sub = _cancelSubscription = listen(
_cancelSubscription = listen(
data: (value) {
onData(AsyncData(value), seamless: seamless);
},
error: callOnError,
last: (last, sub) {
last: (last) {
assert(_lastFuture == null, 'bad state');
_lastFuture = last;
},
done: () {
_lastFuture = null;
},
);
assert(
sub == null || _lastFuture != null,
'An async operation is pending but the state for provider.future was not initialized.',
);
} catch (error, stackTrace) {
callOnError(error, stackTrace);
}
Expand All @@ -492,17 +495,7 @@ mixin FutureModifierElement<StateT> on ProviderElement<AsyncValue<StateT>> {

final lastFuture = _lastFuture;
if (lastFuture != null) {
final cancelSubscription = _cancelSubscription;
if (cancelSubscription != null) {
cancelSubscription.resume?.call();
lastFuture
.then(
(_) {},
// ignore: avoid_types_on_closure_parameters
onError: (Object _) {},
)
.whenComplete(cancelSubscription.cancel);
}
_cancelSubscription?.abort?.call();

// Prevent super.dispose from cancelling the subscription on the "last"
// stream value, so that it can be sent to `provider.future`.
Expand Down Expand Up @@ -535,60 +528,22 @@ mixin FutureModifierElement<StateT> on ProviderElement<AsyncValue<StateT>> {

extension<T> on Stream<T> {
AsyncSubscription listenAndTrackLast(
void Function(Future<T>, void Function()) last, {
void Function(Future<T>) last, {
required Object Function() lastOrElseError,
required void Function(T event) onData,
required void Function(Object error, StackTrace stackTrace) onError,
required void Function() onDone,
}) {
final completer = Completer<T>();

Result<T>? result;
late StreamSubscription<T> subscription;
subscription = listen(
(event) {
result = Result.data(event);
onData(event);
},
// ignore: avoid_types_on_closure_parameters
onError: (Object error, StackTrace stackTrace) {
result = Result.error(error, stackTrace);
onError(error, stackTrace);
},
onDone: () {
if (result != null) {
switch (result!) {
case ResultData(:final state):
completer.complete(state);
case ResultError(:final error, :final stackTrace):
completer.future.ignore();
completer.completeError(error, stackTrace);
}
} else {
// The error happens after the associated provider is disposed.
// As such, it's normally never read. Reporting this error as uncaught
// would cause too many false-positives. And the edge-cases that
// do reach this error will throw anyway
completer.future.ignore();

completer.completeError(
lastOrElseError(),
StackTrace.current,
);
}

onDone();
},
);
subscription = listen(onData, onError: onError, onDone: onDone);

final asyncSub = (
cancel: subscription.cancel,
pause: subscription.pause,
resume: subscription.resume
resume: subscription.resume,
abort: subscription.cancel,
);

last(completer.future, asyncSub.cancel);

return asyncSub;
}
}
58 changes: 14 additions & 44 deletions packages/riverpod/test/src/providers/stream_notifier_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ void main() {
test('closes the StreamSubscription upon disposing the provider', () async {
final onCancel = OnCancelMock();
final container = ProviderContainer.test();
final cancelCompleter = Completer<void>.sync();
final provider = factory.simpleTestProvider<int>((ref, _) {
final controller = StreamController<int>();
ref.onDispose(() {
Expand All @@ -43,10 +42,7 @@ void main() {

return DelegatingStream(
controller.stream,
onSubscriptionCancel: () {
onCancel.call();
cancelCompleter.complete();
},
onSubscriptionCancel: onCancel.call,
);
});

Expand All @@ -55,12 +51,9 @@ void main() {

container.dispose();

verifyZeroInteractions(onCancel);

await expectLater(future, throwsA(42));
await cancelCompleter.future;

verifyOnly(onCancel, onCancel());

await expectLater(future, throwsStateError);
});

test('Pauses the Stream when the provider is paused', () {
Expand Down Expand Up @@ -690,60 +683,37 @@ void main() {
() async {
final container = ProviderContainer.test();
final completer = Completer<int>.sync();
addTearDown(() => completer.complete(42));
final provider = factory.simpleTestProvider<int>(
(ref, _) => Stream.fromFuture(completer.future),
);

final future = container.read(provider.future);
container.dispose();

completer.complete(42);
expect(future, throwsA(isStateError));

await expectLater(future, completion(42));
container.dispose();
});

test(
'when disposed during loading, resolves with the error of StreamNotifier.build',
() async {
'going data > loading while the future is still pending. '
'Resolves with error', () async {
final container = ProviderContainer.test();
final completer = Completer<int>.sync();
addTearDown(() => completer.complete(42));
final provider = factory.simpleTestProvider<int>(
(ref, _) => Stream.fromFuture(completer.future),
);

container.read(provider);
container.read(provider.notifier).state = const AsyncData(42);
container.read(provider.notifier).state = const AsyncLoading<int>();

final future = container.read(provider.future);
expect(future, throwsA(isStateError));

container.dispose();

completer.completeError(42);

await expectLater(future, throwsA(42));
});

test(
'going data > loading while the future is still pending. '
'Resolves with last future result',
() async {
final container = ProviderContainer.test();
final completer = Completer<int>.sync();
final provider = factory.simpleTestProvider<int>(
(ref, _) => Stream.fromFuture(completer.future),
);

container.read(provider);
container.read(provider.notifier).state = const AsyncData(42);
container.read(provider.notifier).state = const AsyncLoading<int>();

final future = container.read(provider.future);

container.dispose();

completer.complete(42);

await expectLater(future, completion(42));
},
);

test('if going back to loading after future resolved, throws StateError',
() async {
final container = ProviderContainer.test();
Expand Down
10 changes: 7 additions & 3 deletions packages/riverpod_generator/test/stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void main() {
],
);

final result = container.read(publicProvider.future);
final result = container.listen(publicProvider.future, (a, b) {}).read();
expect(await result, 'Hello world');
});

Expand All @@ -54,11 +54,15 @@ void main() {
);

expect(
await container.read(familyProvider(42, third: .42).future),
await container
.listen(familyProvider(42, third: .42).future, (a, b) {})
.read(),
'Hello world 42 null 0.42 true null',
);
expect(
await container.read(familyProvider(21, third: .21).future),
await container
.listen(familyProvider(21, third: .21).future, (a, b) {})
.read(),
'Override',
);
});
Expand Down

0 comments on commit cc80883

Please sign in to comment.