Skip to content

Commit

Permalink
fix: ODP service (#1245)
Browse files Browse the repository at this point in the history
  • Loading branch information
ookami-kb authored Jan 22, 2024
1 parent 61d1672 commit 8c72fc7
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ class ODPRepository {
final MyDatabase _db;
final TokenList _tokens;

Future<IList<String>> getNonCompletedPaymentIds() async {
final query = _db.select(_db.oDPRows)
..where(
(p) => p.status.isNotInValues([
ODPStatusDto.success,
ODPStatusDto.txFailure,
ODPStatusDto.txSendFailure,
ODPStatusDto.txWaitFailure,
]),
);

final rows = await query.get();

return rows.map((row) => row.id).toIList();
}

Future<OutgoingDirectPayment?> load(String id) {
final query = _db.select(_db.oDPRows)..where((p) => p.id.equals(id));

Expand Down Expand Up @@ -53,28 +69,6 @@ class ODPRepository {
}

Future<void> clear() => _db.delete(_db.oDPRows).go();

Stream<IList<OutgoingDirectPayment>> watchTxCreated() => _watchWithStatuses([
ODPStatusDto.txCreated,
ODPStatusDto.txSendFailure,
]);

Stream<IList<OutgoingDirectPayment>> watchTxSent() => _watchWithStatuses([
ODPStatusDto.txSent,
ODPStatusDto.txWaitFailure,
]);

Stream<IList<OutgoingDirectPayment>> _watchWithStatuses(
Iterable<ODPStatusDto> statuses,
) {
final query = _db.select(_db.oDPRows)
..where((p) => p.status.isInValues(statuses));

return query
.watch()
.map((rows) => rows.map((row) => row.toModel(_tokens)))
.map((event) => event.toIList());
}
}

class ODPRows extends Table with AmountMixin, EntityMixin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class OutgoingDirectPayment with _$OutgoingDirectPayment {
}

@freezed
class ODPStatus with _$ODPStatus {
sealed class ODPStatus with _$ODPStatus {
/// Tx created, but not sent yet. At this stage, it's safe to recreate it.
const factory ODPStatus.txCreated(
SignedTx tx, {
Expand All @@ -41,3 +41,7 @@ class ODPStatus with _$ODPStatus {
const factory ODPStatus.txFailure({TxFailureReason? reason}) =
ODPStatusTxFailure;
}

extension OutgoingDirectPaymentExt on OutgoingDirectPayment {
bool get isRetriable => status is ODPStatusTxFailure;
}
Original file line number Diff line number Diff line change
@@ -1,37 +1,17 @@
import 'package:flutter/material.dart';
import 'package:nested/nested.dart';
import 'package:provider/provider.dart';

import '../../di.dart';
import '../accounts/module.dart';
import '../balances/widgets/context_ext.dart';
import 'data/repository.dart';
import 'services/tx_created_watcher.dart';
import 'services/tx_sent_watcher.dart';
import 'widgets/link_listener.dart';

class ODPModule extends SingleChildStatelessWidget {
const ODPModule({super.key, super.child});

@override
Widget buildWithChild(BuildContext context, Widget? child) => MultiProvider(
providers: [
Provider<TxCreatedWatcher>(
lazy: false,
create: (context) => sl<TxCreatedWatcher>()
..call(onBalanceAffected: () => context.notifyBalanceAffected()),
dispose: (_, value) => value.dispose(),
),
Provider<TxSentWatcher>(
lazy: false,
create: (context) => sl<TxSentWatcher>()
..call(onBalanceAffected: () => context.notifyBalanceAffected()),
dispose: (_, value) => value.dispose(),
),
],
child: LogoutListener(
onLogout: (_) => sl<ODPRepository>().clear(),
child: ODPLinkListener(child: child ?? const SizedBox.shrink()),
),
Widget buildWithChild(BuildContext context, Widget? child) => LogoutListener(
onLogout: (_) => sl<ODPRepository>().clear(),
child: ODPLinkListener(child: child ?? const SizedBox.shrink()),
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class _ODPDetailsScreenState extends State<ODPDetailsScreen> {
),
txFailure: (it) => TransferError(
onBack: () => context.router.pop(),
onRetry: () => context.retryODP(payment: payment),
onRetry: () => context.retryODP(paymentId: payment.id),
reason: it.reason,
),
orElse: () => TransferProgress(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';

import 'package:dfunc/dfunc.dart';
import 'package:espressocash_api/espressocash_api.dart';
import 'package:injectable/injectable.dart';
Expand All @@ -8,17 +10,31 @@ import 'package:uuid/uuid.dart';
import '../../../config.dart';
import '../../../core/amount.dart';
import '../../accounts/models/ec_wallet.dart';
import '../../authenticated/auth_scope.dart';
import '../../transactions/models/tx_results.dart';
import '../../transactions/services/resign_tx.dart';
import '../../transactions/services/tx_sender.dart';
import '../data/repository.dart';
import '../models/outgoing_direct_payment.dart';

@injectable
@Singleton(scope: authScope)
class ODPService {
const ODPService(this._client, this._repository);
ODPService(this._client, this._repository, this._txSender);

final CryptopleaseClient _client;
final ODPRepository _repository;
final TxSender _txSender;

final Map<String, StreamSubscription<void>> _subscriptions = {};

@PostConstruct(preResolve: true)
Future<void> init() async {
final nonCompletedPayments = await _repository.getNonCompletedPaymentIds();

for (final payment in nonCompletedPayments) {
_subscribe(payment);
}
}

Future<OutgoingDirectPayment> create({
required ECWallet account,
Expand All @@ -45,14 +61,18 @@ class ODPService {
);

await _repository.save(payment);
_subscribe(id);

return payment;
}

Future<OutgoingDirectPayment> retry(
OutgoingDirectPayment payment, {
Future<void> retry(
String paymentId, {
required ECWallet account,
}) async {
final payment = await _repository.load(paymentId);
if (payment == null || !payment.isRetriable) return;

final status = await _createTx(
account: account,
receiver: payment.receiver,
Expand All @@ -63,8 +83,7 @@ class ODPService {
final newPayment = payment.copyWith(status: status);

await _repository.save(newPayment);

return newPayment;
_subscribe(newPayment.id);
}

Future<ODPStatus> _createTx({
Expand Down Expand Up @@ -94,4 +113,62 @@ class ODPService {
);
}
}

void _subscribe(String paymentId) {
_subscriptions[paymentId] = _repository
.watch(paymentId)
.asyncExpand<OutgoingDirectPayment?>((payment) {
switch (payment.status) {
case ODPStatusTxCreated():
return _send(payment).asStream();
case ODPStatusTxSent():
return _wait(payment).asStream();
case ODPStatusSuccess():
case ODPStatusTxFailure():
_subscriptions.remove(paymentId)?.cancel();

return null;
}
}).listen((payment) => payment?.let(_repository.save));
}

Future<OutgoingDirectPayment> _send(OutgoingDirectPayment payment) async {
final status = payment.status;
if (status is! ODPStatusTxCreated) {
return payment;
}

final tx = await _txSender.send(status.tx, minContextSlot: status.slot);

final ODPStatus? newStatus = tx.map(
sent: (_) => ODPStatus.txSent(
status.tx,
slot: status.slot,
),
invalidBlockhash: (_) => const ODPStatus.txFailure(
reason: TxFailureReason.invalidBlockhashSending,
),
failure: (it) => ODPStatus.txFailure(reason: it.reason),
networkError: (_) => null,
);

return newStatus == null ? payment : payment.copyWith(status: newStatus);
}

Future<OutgoingDirectPayment> _wait(OutgoingDirectPayment payment) async {
final status = payment.status;
if (status is! ODPStatusTxSent) {
return payment;
}

final tx = await _txSender.wait(status.tx, minContextSlot: status.slot);

final ODPStatus? newStatus = tx.map(
success: (_) => ODPStatus.success(txId: status.tx.id),
failure: (tx) => ODPStatus.txFailure(reason: tx.reason),
networkError: (_) => null,
);

return newStatus == null ? payment : payment.copyWith(status: newStatus);
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 8c72fc7

Please sign in to comment.