Skip to content

Commit

Permalink
[fix,async] exception in async _CancelOnErrorSubscriptionWrapper.onEr…
Browse files Browse the repository at this point in the history
…ror (#855)
  • Loading branch information
PROGrand authored Jan 28, 2025
1 parent 3b35aca commit 2458e1a
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.13.0

- Fix type check and cast in SubscriptionStream's cancelOnError wrapper

## 2.12.0

- Require Dart 3.4.
Expand Down
9 changes: 7 additions & 2 deletions pkgs/async/lib/src/result/error.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ class ErrorResult implements Result<Never> {
/// has to be a function expecting only one argument, which will be called
/// with only the error.
void handle(Function errorHandler) {
if (errorHandler is ZoneBinaryCallback) {
if (errorHandler is dynamic Function(Object, StackTrace)) {
errorHandler(error, stackTrace);
} else if (errorHandler is dynamic Function(Object)) {
errorHandler(error);
} else {
(errorHandler as ZoneUnaryCallback)(error);
throw ArgumentError(
'is neither Function(Object, StackTrace) nor Function(Object)',
'errorHandler',
);
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkgs/async/lib/src/subscription_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class _CancelOnErrorSubscriptionWrapper<T>
super.onError((Object error, StackTrace stackTrace) {
// Wait for the cancel to complete before sending the error event.
super.cancel().whenComplete(() {
if (handleError is ZoneBinaryCallback) {
if (handleError is dynamic Function(Object, StackTrace)) {
handleError(error, stackTrace);
} else if (handleError != null) {
(handleError as ZoneUnaryCallback)(error);
} else if (handleError is dynamic Function(Object)) {
handleError(error);
}
});
});
Expand Down
2 changes: 1 addition & 1 deletion pkgs/async/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.12.0
version: 2.13.0
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/core/tree/main/pkgs/async
issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync
Expand Down
23 changes: 23 additions & 0 deletions pkgs/async/test/result/result_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,29 @@ void main() {
expect(called, isTrue);
});

test('handle typed binary', () {
final result = ErrorResult('error', stack);
var called = false;
void f(Object error, StackTrace stackTrace) {
called = true;
}

result.handle(f);
expect(called, isTrue);
});

test('handle typed unary', () {
final result = ErrorResult('error', stack);
var called = false;

void f(Object error) {
called = true;
}

result.handle(f);
expect(called, isTrue);
});

test('handle unary and binary', () {
var result = ErrorResult('error', stack);
var called = false;
Expand Down
70 changes: 70 additions & 0 deletions pkgs/async/test/subscription_stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,76 @@ void main() {
});
}
});

group('subscriptionStream error callback', () {
test('binary typed', () async {
var completer = Completer<void>();
var stream = createErrorStream();
var sourceSubscription = stream.listen(null, cancelOnError: true);
var subscriptionStream = SubscriptionStream(sourceSubscription);

void f(Object error, StackTrace stackTrace) {
completer.complete();
}

subscriptionStream.listen((_) {},
onError: f,
onDone: () => throw 'should not happen',
cancelOnError: true);
await completer.future;
await flushMicrotasks();
});

test('binary dynamic', () async {
var completer = Completer<void>();
var stream = createErrorStream();
var sourceSubscription = stream.listen(null, cancelOnError: true);
var subscriptionStream = SubscriptionStream(sourceSubscription);

subscriptionStream.listen((_) {},
onError: (error, stackTrace) {
completer.complete();
},
onDone: () => throw 'should not happen',
cancelOnError: true);
await completer.future;
await flushMicrotasks();
});

test('unary typed', () async {
var completer = Completer<void>();
var stream = createErrorStream();
var sourceSubscription = stream.listen(null, cancelOnError: true);
var subscriptionStream = SubscriptionStream(sourceSubscription);

void f(Object error) {
completer.complete();
}

subscriptionStream.listen((_) {},
onError: f,
onDone: () => throw 'should not happen',
cancelOnError: true);
await completer.future;
await flushMicrotasks();
});

test('unary dynamic', () async {
var completer = Completer<void>();
var stream = createErrorStream();
var sourceSubscription = stream.listen(null, cancelOnError: true);
var subscriptionStream = SubscriptionStream(sourceSubscription);

subscriptionStream.listen((_) {},
onError: (error) {
completer.complete();
},
onDone: () => throw 'should not happen',
cancelOnError: true);
await completer.future;
await flushMicrotasks();
});
});
}

Stream<int> createStream() async* {
Expand Down

0 comments on commit 2458e1a

Please sign in to comment.