Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

perf(BUX-000): refactorize P2P - send tx to the receiver only once #517

Merged
merged 1 commit into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion record_tx_strategy_outgoing_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func _outgoingNotifyP2p(ctx context.Context, logger *zerolog.Logger, tx *Transac
Str("txID", tx.ID).
Msg("start p2p")

if err := processP2PTransaction(ctx, tx.syncTransaction, tx); err != nil {
if err := processP2PTransaction(ctx, tx); err != nil {
logger.Error().
Str("txID", tx.ID).
Msgf("processP2PTransaction failed. Reason: %s", err)
Expand Down
87 changes: 40 additions & 47 deletions sync_tx_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,11 @@ func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transact
}

// processP2PTransaction will process the sync transaction record, or save the failure
func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error {
func processP2PTransaction(ctx context.Context, tx *Transaction) error {
// Successfully capture any panics, convert to readable string and log the error
defer recoverAndLog(syncTx.Client().Logger())
defer recoverAndLog(tx.Client().Logger())

syncTx := tx.syncTransaction
// Create the lock and set the release for after the function completes
unlock, err := newWriteLock(
ctx, fmt.Sprintf(lockKeyProcessP2PTx, syncTx.GetID()), syncTx.Client().Cachestore(),
Expand All @@ -286,17 +287,8 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac
return err
}

// Get the transaction
if transaction == nil {
if transaction, err = getTransactionByID(
ctx, "", syncTx.ID, syncTx.GetOptions(false)...,
); err != nil {
return err
}
}

// No draft?
if len(transaction.DraftID) == 0 {
if len(tx.DraftID) == 0 {
_bailAndSaveSyncTransaction(
ctx, syncTx, SyncStatusComplete, syncActionP2P, "all", "no draft found, cannot complete p2p",
)
Expand All @@ -305,7 +297,7 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac

// Notify any P2P paymail providers associated to the transaction
var results []*SyncResult
if results, err = _notifyPaymailProviders(ctx, transaction); err != nil {
if results, err = _notifyPaymailProviders(ctx, tx); err != nil {
_bailAndSaveSyncTransaction(
ctx, syncTx, SyncStatusReady, syncActionP2P, "", err.Error(),
)
Expand Down Expand Up @@ -339,45 +331,46 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac

// _notifyPaymailProviders will notify any associated Paymail providers
func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]*SyncResult, error) {
// First get the draft tx
draftTx, err := getDraftTransactionID(
ctx,
transaction.XPubID,
transaction.DraftID,
transaction.GetOptions(false)...,
)
if err != nil {
return nil, err
} else if draftTx == nil {
return nil, errors.New("draft not found: " + transaction.DraftID)
}

// Loop each output looking for paymail outputs
var attempts []*SyncResult
pm := transaction.Client().PaymailClient()
outputs := transaction.draftTransaction.Configuration.Outputs

notifiedReceivers := make([]string, 0)
results := make([]*SyncResult, len(outputs))

var payload *paymail.P2PTransactionPayload
var err error

for _, out := range draftTx.Configuration.Outputs {
if out.PaymailP4 != nil && out.PaymailP4.ResolutionType == ResolutionTypeP2P {

// Notify each provider with the transaction
if payload, err = finalizeP2PTransaction(
ctx,
pm,
out.PaymailP4,
transaction,
); err != nil {
return nil, err
}
attempts = append(attempts, &SyncResult{
Action: syncActionP2P,
ExecutedAt: time.Now().UTC(),
Provider: out.PaymailP4.ReceiveEndpoint,
StatusMessage: "success: " + payload.TxID,
})
for _, out := range outputs {
p4 := out.PaymailP4

if p4 == nil || p4.ResolutionType != ResolutionTypeP2P {
continue
}

receiver := fmt.Sprintf("%s@%s", p4.Alias, p4.Domain)
if contains(notifiedReceivers, func(x string) bool { return x == receiver }) {
continue // no need to send the same transaction to the same receiver second time
}

if payload, err = finalizeP2PTransaction(
ctx,
pm,
p4,
transaction,
); err != nil {
return nil, err
}

notifiedReceivers = append(notifiedReceivers, receiver)
results = append(results, &SyncResult{
Action: syncActionP2P,
ExecutedAt: time.Now().UTC(),
Provider: p4.ReceiveEndpoint,
StatusMessage: "success: " + payload.TxID,
})

}
return attempts, nil
return results, nil
}

// utils
Expand Down
14 changes: 14 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,17 @@ func recoverAndLog(log *zerolog.Logger) {
)
}
}

// finds the first element in a collection that satisfies a specified condition.
func find[E any](collection []E, predicate func(E) bool) *E {
for _, v := range collection {
if predicate(v) {
return &v
}
}
return nil
}

func contains[E any](collection []E, predicate func(E) bool) bool{
return find(collection, predicate) != nil
}
Loading