From 8c72fc7d0bd445124ac59c35742e7662fc5d8f27 Mon Sep 17 00:00:00 2001 From: Kirill Bubochkin Date: Mon, 22 Jan 2024 23:51:10 +0100 Subject: [PATCH] fix: ODP service (#1245) --- .../data/repository.dart | 38 ++++---- .../models/outgoing_direct_payment.dart | 6 +- .../outgoing_direct_payments/module.dart | 26 +----- .../screens/odp_details_screen.dart | 2 +- .../services/odp_service.dart | 89 +++++++++++++++++-- .../services/payment_watcher.dart | 74 --------------- .../services/tx_created_watcher.dart | 64 ------------- .../services/tx_sent_watcher.dart | 56 ------------ .../widgets/extensions.dart | 5 +- .../transactions/services/tx_sender.dart | 85 +++++++++++------- .../services/odp_service_test.dart | 35 +++----- 11 files changed, 174 insertions(+), 306 deletions(-) delete mode 100644 packages/espressocash_app/lib/features/outgoing_direct_payments/services/payment_watcher.dart delete mode 100644 packages/espressocash_app/lib/features/outgoing_direct_payments/services/tx_created_watcher.dart delete mode 100644 packages/espressocash_app/lib/features/outgoing_direct_payments/services/tx_sent_watcher.dart diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/data/repository.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/data/repository.dart index 6862704325..b543eaa214 100644 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/data/repository.dart +++ b/packages/espressocash_app/lib/features/outgoing_direct_payments/data/repository.dart @@ -23,6 +23,22 @@ class ODPRepository { final MyDatabase _db; final TokenList _tokens; + Future> getNonCompletedPaymentIds() async { + final query = _db.select(_db.oDPRows) + ..where( + (p) => p.status.isNotInValues([ + ODPStatusDto.success, + ODPStatusDto.txFailure, + ODPStatusDto.txSendFailure, + ODPStatusDto.txWaitFailure, + ]), + ); + + final rows = await query.get(); + + return rows.map((row) => row.id).toIList(); + } + Future load(String id) { final query = _db.select(_db.oDPRows)..where((p) => p.id.equals(id)); @@ -53,28 +69,6 @@ class ODPRepository { } Future clear() => _db.delete(_db.oDPRows).go(); - - Stream> watchTxCreated() => _watchWithStatuses([ - ODPStatusDto.txCreated, - ODPStatusDto.txSendFailure, - ]); - - Stream> watchTxSent() => _watchWithStatuses([ - ODPStatusDto.txSent, - ODPStatusDto.txWaitFailure, - ]); - - Stream> _watchWithStatuses( - Iterable statuses, - ) { - final query = _db.select(_db.oDPRows) - ..where((p) => p.status.isInValues(statuses)); - - return query - .watch() - .map((rows) => rows.map((row) => row.toModel(_tokens))) - .map((event) => event.toIList()); - } } class ODPRows extends Table with AmountMixin, EntityMixin { diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/models/outgoing_direct_payment.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/models/outgoing_direct_payment.dart index 287c699d67..62dd295feb 100644 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/models/outgoing_direct_payment.dart +++ b/packages/espressocash_app/lib/features/outgoing_direct_payments/models/outgoing_direct_payment.dart @@ -20,7 +20,7 @@ class OutgoingDirectPayment with _$OutgoingDirectPayment { } @freezed -class ODPStatus with _$ODPStatus { +sealed class ODPStatus with _$ODPStatus { /// Tx created, but not sent yet. At this stage, it's safe to recreate it. const factory ODPStatus.txCreated( SignedTx tx, { @@ -41,3 +41,7 @@ class ODPStatus with _$ODPStatus { const factory ODPStatus.txFailure({TxFailureReason? reason}) = ODPStatusTxFailure; } + +extension OutgoingDirectPaymentExt on OutgoingDirectPayment { + bool get isRetriable => status is ODPStatusTxFailure; +} diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/module.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/module.dart index af0b9702ed..7d14399d1b 100644 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/module.dart +++ b/packages/espressocash_app/lib/features/outgoing_direct_payments/module.dart @@ -1,37 +1,17 @@ import 'package:flutter/material.dart'; import 'package:nested/nested.dart'; -import 'package:provider/provider.dart'; import '../../di.dart'; import '../accounts/module.dart'; -import '../balances/widgets/context_ext.dart'; import 'data/repository.dart'; -import 'services/tx_created_watcher.dart'; -import 'services/tx_sent_watcher.dart'; import 'widgets/link_listener.dart'; class ODPModule extends SingleChildStatelessWidget { const ODPModule({super.key, super.child}); @override - Widget buildWithChild(BuildContext context, Widget? child) => MultiProvider( - providers: [ - Provider( - lazy: false, - create: (context) => sl() - ..call(onBalanceAffected: () => context.notifyBalanceAffected()), - dispose: (_, value) => value.dispose(), - ), - Provider( - lazy: false, - create: (context) => sl() - ..call(onBalanceAffected: () => context.notifyBalanceAffected()), - dispose: (_, value) => value.dispose(), - ), - ], - child: LogoutListener( - onLogout: (_) => sl().clear(), - child: ODPLinkListener(child: child ?? const SizedBox.shrink()), - ), + Widget buildWithChild(BuildContext context, Widget? child) => LogoutListener( + onLogout: (_) => sl().clear(), + child: ODPLinkListener(child: child ?? const SizedBox.shrink()), ); } diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/screens/odp_details_screen.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/screens/odp_details_screen.dart index e915b61374..bf5a15ae93 100644 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/screens/odp_details_screen.dart +++ b/packages/espressocash_app/lib/features/outgoing_direct_payments/screens/odp_details_screen.dart @@ -65,7 +65,7 @@ class _ODPDetailsScreenState extends State { ), txFailure: (it) => TransferError( onBack: () => context.router.pop(), - onRetry: () => context.retryODP(payment: payment), + onRetry: () => context.retryODP(paymentId: payment.id), reason: it.reason, ), orElse: () => TransferProgress( diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/services/odp_service.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/services/odp_service.dart index c5b0ce21d6..8df379b511 100644 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/services/odp_service.dart +++ b/packages/espressocash_app/lib/features/outgoing_direct_payments/services/odp_service.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:dfunc/dfunc.dart'; import 'package:espressocash_api/espressocash_api.dart'; import 'package:injectable/injectable.dart'; @@ -8,17 +10,31 @@ import 'package:uuid/uuid.dart'; import '../../../config.dart'; import '../../../core/amount.dart'; import '../../accounts/models/ec_wallet.dart'; +import '../../authenticated/auth_scope.dart'; import '../../transactions/models/tx_results.dart'; import '../../transactions/services/resign_tx.dart'; +import '../../transactions/services/tx_sender.dart'; import '../data/repository.dart'; import '../models/outgoing_direct_payment.dart'; -@injectable +@Singleton(scope: authScope) class ODPService { - const ODPService(this._client, this._repository); + ODPService(this._client, this._repository, this._txSender); final CryptopleaseClient _client; final ODPRepository _repository; + final TxSender _txSender; + + final Map> _subscriptions = {}; + + @PostConstruct(preResolve: true) + Future init() async { + final nonCompletedPayments = await _repository.getNonCompletedPaymentIds(); + + for (final payment in nonCompletedPayments) { + _subscribe(payment); + } + } Future create({ required ECWallet account, @@ -45,14 +61,18 @@ class ODPService { ); await _repository.save(payment); + _subscribe(id); return payment; } - Future retry( - OutgoingDirectPayment payment, { + Future retry( + String paymentId, { required ECWallet account, }) async { + final payment = await _repository.load(paymentId); + if (payment == null || !payment.isRetriable) return; + final status = await _createTx( account: account, receiver: payment.receiver, @@ -63,8 +83,7 @@ class ODPService { final newPayment = payment.copyWith(status: status); await _repository.save(newPayment); - - return newPayment; + _subscribe(newPayment.id); } Future _createTx({ @@ -94,4 +113,62 @@ class ODPService { ); } } + + void _subscribe(String paymentId) { + _subscriptions[paymentId] = _repository + .watch(paymentId) + .asyncExpand((payment) { + switch (payment.status) { + case ODPStatusTxCreated(): + return _send(payment).asStream(); + case ODPStatusTxSent(): + return _wait(payment).asStream(); + case ODPStatusSuccess(): + case ODPStatusTxFailure(): + _subscriptions.remove(paymentId)?.cancel(); + + return null; + } + }).listen((payment) => payment?.let(_repository.save)); + } + + Future _send(OutgoingDirectPayment payment) async { + final status = payment.status; + if (status is! ODPStatusTxCreated) { + return payment; + } + + final tx = await _txSender.send(status.tx, minContextSlot: status.slot); + + final ODPStatus? newStatus = tx.map( + sent: (_) => ODPStatus.txSent( + status.tx, + slot: status.slot, + ), + invalidBlockhash: (_) => const ODPStatus.txFailure( + reason: TxFailureReason.invalidBlockhashSending, + ), + failure: (it) => ODPStatus.txFailure(reason: it.reason), + networkError: (_) => null, + ); + + return newStatus == null ? payment : payment.copyWith(status: newStatus); + } + + Future _wait(OutgoingDirectPayment payment) async { + final status = payment.status; + if (status is! ODPStatusTxSent) { + return payment; + } + + final tx = await _txSender.wait(status.tx, minContextSlot: status.slot); + + final ODPStatus? newStatus = tx.map( + success: (_) => ODPStatus.success(txId: status.tx.id), + failure: (tx) => ODPStatus.txFailure(reason: tx.reason), + networkError: (_) => null, + ); + + return newStatus == null ? payment : payment.copyWith(status: newStatus); + } } diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/services/payment_watcher.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/services/payment_watcher.dart deleted file mode 100644 index d6f369bd02..0000000000 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/services/payment_watcher.dart +++ /dev/null @@ -1,74 +0,0 @@ -import 'dart:async'; - -import 'package:async/async.dart'; -import 'package:dfunc/dfunc.dart'; -import 'package:fast_immutable_collections/fast_immutable_collections.dart'; -import 'package:flutter/foundation.dart'; - -import '../../../core/cancelable_job.dart'; -import '../data/repository.dart'; -import '../models/outgoing_direct_payment.dart'; - -abstract class PaymentWatcher { - PaymentWatcher(this._repository); - - final ODPRepository _repository; - - StreamSubscription? _repoSubscription; - final Map> _operations = {}; - - @protected - Stream> watchPayments( - ODPRepository repository, - ); - - @protected - CancelableJob createJob( - OutgoingDirectPayment payment, - ); - - void call({required VoidCallback onBalanceAffected}) { - _repoSubscription = - watchPayments(_repository).distinct().listen((payments) async { - final keys = payments.map((e) => e.id).toSet(); - for (final key in _operations.keys.toSet()) { - if (!keys.contains(key)) { - await _operations[key]?.cancel(); - } - } - - for (final payment in payments) { - final job = _operations[payment.id]; - if (job != null) return; - - _operations[payment.id] = - createJob(payment).call().then((newPayment) async { - if (payment != newPayment) { - if (newPayment.status.affectsBalance) { - onBalanceAffected(); - } - await _repository.save(newPayment); - } - _operations.remove(payment.id); - }); - } - }); - } - - @mustCallSuper - void dispose() { - _repoSubscription?.cancel(); - for (final subscription in _operations.values) { - subscription.cancel(); - } - } -} - -extension on ODPStatus { - bool get affectsBalance => this.map( - txCreated: F, - txSent: F, - txFailure: F, - success: T, - ); -} diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/services/tx_created_watcher.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/services/tx_created_watcher.dart deleted file mode 100644 index 6ccad1fa1f..0000000000 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/services/tx_created_watcher.dart +++ /dev/null @@ -1,64 +0,0 @@ -import 'dart:async'; - -import 'package:fast_immutable_collections/fast_immutable_collections.dart'; -import 'package:injectable/injectable.dart'; - -import '../../../core/cancelable_job.dart'; -import '../../transactions/models/tx_results.dart'; -import '../../transactions/services/tx_sender.dart'; -import '../data/repository.dart'; -import '../models/outgoing_direct_payment.dart'; -import 'payment_watcher.dart'; - -/// Watches for [ODPStatus.txCreated] payments and and sends the tx. -/// -/// The watcher will try to submit the tx until it's accepted or rejected. -@injectable -class TxCreatedWatcher extends PaymentWatcher { - TxCreatedWatcher(super._repository, this._sender); - - final TxSender _sender; - - @override - CancelableJob createJob( - OutgoingDirectPayment payment, - ) => - _ODPTxCreatedJob(payment, _sender); - - @override - Stream> watchPayments( - ODPRepository repository, - ) => - repository.watchTxCreated(); -} - -class _ODPTxCreatedJob extends CancelableJob { - const _ODPTxCreatedJob(this.payment, this.sender); - - final OutgoingDirectPayment payment; - final TxSender sender; - - @override - Future process() async { - final status = payment.status; - if (status is! ODPStatusTxCreated) { - return payment; - } - - final tx = await sender.send(status.tx, minContextSlot: status.slot); - - final ODPStatus? newStatus = tx.map( - sent: (_) => ODPStatus.txSent( - status.tx, - slot: status.slot, - ), - invalidBlockhash: (_) => const ODPStatus.txFailure( - reason: TxFailureReason.invalidBlockhashSending, - ), - failure: (it) => ODPStatus.txFailure(reason: it.reason), - networkError: (_) => null, - ); - - return newStatus == null ? null : payment.copyWith(status: newStatus); - } -} diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/services/tx_sent_watcher.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/services/tx_sent_watcher.dart deleted file mode 100644 index 1d925f3a35..0000000000 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/services/tx_sent_watcher.dart +++ /dev/null @@ -1,56 +0,0 @@ -import 'dart:async'; - -import 'package:fast_immutable_collections/fast_immutable_collections.dart'; -import 'package:injectable/injectable.dart'; - -import '../../../core/cancelable_job.dart'; -import '../../transactions/services/tx_sender.dart'; -import '../data/repository.dart'; -import '../models/outgoing_direct_payment.dart'; -import 'payment_watcher.dart'; - -/// Watches for [ODPStatus.txSent] payments and waits for the tx to be -/// confirmed. -@injectable -class TxSentWatcher extends PaymentWatcher { - TxSentWatcher(super._repository, this._sender); - - final TxSender _sender; - - @override - CancelableJob createJob( - OutgoingDirectPayment payment, - ) => - _ODPTxSentJob(payment, _sender); - - @override - Stream> watchPayments( - ODPRepository repository, - ) => - repository.watchTxSent(); -} - -class _ODPTxSentJob extends CancelableJob { - const _ODPTxSentJob(this.payment, this.sender); - - final OutgoingDirectPayment payment; - final TxSender sender; - - @override - Future process() async { - final status = payment.status; - if (status is! ODPStatusTxSent) { - return payment; - } - - final tx = await sender.wait(status.tx, minContextSlot: BigInt.zero); - - final ODPStatus? newStatus = tx.map( - success: (_) => ODPStatus.success(txId: status.tx.id), - failure: (tx) => ODPStatus.txFailure(reason: tx.reason), - networkError: (_) => null, - ); - - return newStatus == null ? null : payment.copyWith(status: newStatus); - } -} diff --git a/packages/espressocash_app/lib/features/outgoing_direct_payments/widgets/extensions.dart b/packages/espressocash_app/lib/features/outgoing_direct_payments/widgets/extensions.dart index 8418586417..e108ae6bb2 100644 --- a/packages/espressocash_app/lib/features/outgoing_direct_payments/widgets/extensions.dart +++ b/packages/espressocash_app/lib/features/outgoing_direct_payments/widgets/extensions.dart @@ -9,7 +9,6 @@ import '../../../di.dart'; import '../../../ui/loader.dart'; import '../../accounts/models/account.dart'; import '../../analytics/analytics_manager.dart'; -import '../models/outgoing_direct_payment.dart'; import '../services/odp_service.dart'; extension BuildContextExt on BuildContext { @@ -35,10 +34,10 @@ extension BuildContextExt on BuildContext { return payment.id; }); - Future retryODP({required OutgoingDirectPayment payment}) => + Future retryODP({required String paymentId}) => runWithLoader(this, () async { await sl().retry( - payment, + paymentId, account: read().wallet, ); sl().directPaymentCreated(); diff --git a/packages/espressocash_app/lib/features/transactions/services/tx_sender.dart b/packages/espressocash_app/lib/features/transactions/services/tx_sender.dart index 0d61e1217d..d532b022f2 100644 --- a/packages/espressocash_app/lib/features/transactions/services/tx_sender.dart +++ b/packages/espressocash_app/lib/features/transactions/services/tx_sender.dart @@ -1,5 +1,5 @@ -import 'package:dfunc/dfunc.dart'; import 'package:injectable/injectable.dart'; +import 'package:logging/logging.dart'; import 'package:rxdart/rxdart.dart'; import 'package:solana/dto.dart'; import 'package:solana/encoder.dart'; @@ -70,6 +70,7 @@ class TxSender { final start = DateTime.now(); Future getSignatureStatus() async { + _logger.fine('${tx.id}: Checking tx status.'); // We need to check blockhash validity before searching for tx to make // sure that it's valid for the tx response slot. // ignore: move-variable-closer-to-its-usage @@ -86,10 +87,18 @@ class TxSender { final t = statuses.value.first; if (t == null) { + _logger.fine('${tx.id}: Tx not found.'); + // Blockhash is still valid, tx can be submitted. - if (blockhashValidity.value) return null; + if (blockhashValidity.value) { + _logger.fine('${tx.id}: Blockhash is still valid.'); + + return null; + } if (DateTime.now().difference(start).inSeconds > 90) { + _logger.fine('${tx.id}: Timeout, failing.'); + // We've been waiting for too long, blockhash is invalid and it // won't be valid. return const TxWaitResult.failure( @@ -99,15 +108,29 @@ class TxSender { // No minContextSlot, it's not safe to assume that we get the latest // status. - if (minContextSlot == BigInt.zero) return null; + if (minContextSlot == BigInt.zero) { + _logger.fine('${tx.id}: minContextSlot is zero.'); + + return null; + } // We were calling the status with too old slot, we cannot be sure // that tx was not submitted. - if (statuses.context.slot < minContextSlot) return null; + if (statuses.context.slot < minContextSlot) { + _logger.fine('${tx.id}: minContextSlot not reached.'); + + return null; + } // We were calling the status with too old slot, blockhash validity // is not guaranteed. - if (statuses.context.slot < blockhashValidity.context.slot) return null; + if (statuses.context.slot < blockhashValidity.context.slot) { + _logger.fine('${tx.id}: Blockhas validity slot is ahead.'); + + return null; + } + + _logger.fine('${tx.id}: Ivalid blockhash.'); // At this stage, blockhash is invalid and it won't be valid, so // tx cannot be submitted. @@ -117,15 +140,22 @@ class TxSender { } if (t.err != null) { + _logger.fine('${tx.id}: Tx error ${t.err}.'); + return const TxWaitResult.failure(reason: TxFailureReason.txError); } if (t.confirmationStatus.index >= ConfirmationStatus.confirmed.index) { + _logger.fine('${tx.id}: Success.'); + return const TxWaitResult.success(); } + + _logger + .fine('${tx.id}: Wrong confirmation status ${t.confirmationStatus}.'); } - Future waitForSignatureStatus() async { + Future waitForSignatureStatus() async { try { await _client.waitForSignatureStatus( tx.id, @@ -134,22 +164,27 @@ class TxSender { timeout: waitForSignatureDefaultTimeout, ); + _logger.fine('${tx.id}: Success from WS.'); + return const TxWaitResult.success(); - } on SubscriptionClientException { + } on SubscriptionClientException catch (error) { + _logger.fine('${tx.id}: Failure from WS $error.'); + return const TxWaitResult.failure(reason: TxFailureReason.txError); - } on Exception { - return const TxWaitResult.networkError(); + } on Exception catch (error) { + _logger.fine('${tx.id}: Network error from WS $error.'); + + return null; } } - final polling = _createPolling( - createSource: () => getSignatureStatus().asStream(), - ); + final polling = Stream.periodic(const Duration(seconds: 10)) + .startWith(null) + .exhaustMap((_) => getSignatureStatus().asStream().onErrorReturn(null)); - return Future.any([ - polling.whereNotNull().first, - waitForSignatureStatus(), - ]); + return MergeStream([polling, waitForSignatureStatus().asStream()]) + .whereNotNull() + .first; } } @@ -173,20 +208,4 @@ extension on JsonRpcException { } } -Stream _createPolling({required Func0> createSource}) { - Duration backoff = const Duration(seconds: 1); - - Stream retryWhen(void _, void __) async* { - await Future.delayed(backoff); - if (backoff < const Duration(seconds: 30)) backoff *= 2; - - yield null; - } - - return RetryWhenStream( - () => Stream.periodic(const Duration(seconds: 10)) - .startWith(null) - .flatMap((_) => createSource()), - retryWhen, - ); -} +final _logger = Logger('TxSender'); diff --git a/packages/espressocash_app/test/features/outgoing_direct_payments/services/odp_service_test.dart b/packages/espressocash_app/test/features/outgoing_direct_payments/services/odp_service_test.dart index 3ceb4e899d..ff47735e43 100644 --- a/packages/espressocash_app/test/features/outgoing_direct_payments/services/odp_service_test.dart +++ b/packages/espressocash_app/test/features/outgoing_direct_payments/services/odp_service_test.dart @@ -9,8 +9,6 @@ import 'package:espressocash_app/features/accounts/models/ec_wallet.dart'; import 'package:espressocash_app/features/outgoing_direct_payments/data/repository.dart'; import 'package:espressocash_app/features/outgoing_direct_payments/models/outgoing_direct_payment.dart'; import 'package:espressocash_app/features/outgoing_direct_payments/services/odp_service.dart'; -import 'package:espressocash_app/features/outgoing_direct_payments/services/tx_created_watcher.dart'; -import 'package:espressocash_app/features/outgoing_direct_payments/services/tx_sent_watcher.dart'; import 'package:espressocash_app/features/tokens/token.dart'; import 'package:espressocash_app/features/transactions/models/tx_results.dart'; import 'package:espressocash_app/features/transactions/services/tx_sender.dart'; @@ -29,9 +27,6 @@ final client = MockCryptopleaseClient(); @GenerateMocks([TxSender, CryptopleaseClient]) Future main() async { - late TxCreatedWatcher txCreatedWatcher; - late TxSentWatcher txSentWatcher; - final account = LocalWallet(await Ed25519HDKeyPair.random()); final receiver = await Ed25519HDKeyPair.random(); final repository = MemoryRepository(); @@ -39,17 +34,10 @@ Future main() async { setUp(() { reset(sender); reset(client); - - txCreatedWatcher = TxCreatedWatcher(repository, sender) - ..call(onBalanceAffected: ignore); - txSentWatcher = TxSentWatcher(repository, sender) - ..call(onBalanceAffected: ignore); }); tearDown( () async { - txCreatedWatcher.dispose(); - txSentWatcher.dispose(); await repository.clear(); }, ); @@ -82,7 +70,7 @@ Future main() async { cryptoCurrency: CryptoCurrency(token: Token.usdc), ); - ODPService createService() => ODPService(client, repository); + ODPService createService() => ODPService(client, repository, sender); Future createODP(ODPService service) async { final payment = await service.create( @@ -182,16 +170,17 @@ class MemoryRepository implements ODPRepository { _data.stream.map((it) => it[id]!); @override - Stream> watchTxCreated() => _data.stream.map( + Future> getNonCompletedPaymentIds() => _data.stream + .map( (it) => it.values - .where((it) => it.status.maybeMap(orElse: F, txCreated: T)) + .where( + (it) => switch (it.status) { + ODPStatusTxCreated() || ODPStatusTxSent() => true, + ODPStatusSuccess() || ODPStatusTxFailure() => false, + }, + ) + .map((e) => e.id) .toIList(), - ); - - @override - Stream> watchTxSent() => _data.stream.map( - (it) => it.values - .where((it) => it.status.maybeMap(orElse: F, txSent: T)) - .toIList(), - ); + ) + .first; }