From 186eae861d3fef40d8f48361d4057b33ef2d53bd Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Wed, 8 Nov 2023 21:37:03 +0100 Subject: [PATCH 01/10] fix(250): return error if broadcasting fail --- record_tx_strategy_external_incoming_tx.go | 2 +- sync_tx_service.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/record_tx_strategy_external_incoming_tx.go b/record_tx_strategy_external_incoming_tx.go index a0dec99a..1a8fc7d3 100644 --- a/record_tx_strategy_external_incoming_tx.go +++ b/record_tx_strategy_external_incoming_tx.go @@ -28,7 +28,7 @@ func (tx *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, op logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start without ITC, TxID: %s", transaction.ID)) - if transaction.syncTransaction.BroadcastStatus == SyncStatusReady { + if tx.BroadcastNow || transaction.syncTransaction.BroadcastStatus == SyncStatusReady { _externalIncomingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcaset in a cron task } diff --git a/sync_tx_service.go b/sync_tx_service.go index b815ccab..5f3da9eb 100644 --- a/sync_tx_service.go +++ b/sync_tx_service.go @@ -171,7 +171,7 @@ func broadcastSyncTransaction(ctx context.Context, syncTx *SyncTransaction) erro _bailAndSaveSyncTransaction( ctx, syncTx, SyncStatusError, syncActionBroadcast, provider, "broadcast error: "+err.Error(), ) - return nil //nolint:nolintlint,nilerr // error is not needed + return err } // Create status message From 3e666a660c677fdaca67fd444c5f12da6bbdec17 Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Wed, 8 Nov 2023 22:37:06 +0100 Subject: [PATCH 02/10] fix(BUX-250): fail on broadcast on BEEF --- paymail_service_provider.go | 4 ++ record_tx.go | 5 +- record_tx_strategy_external_incoming_tx.go | 62 +++++++++++++++------- record_tx_strategy_internal_incoming_tx.go | 60 ++++++++++++++------- 4 files changed, 89 insertions(+), 42 deletions(-) diff --git a/paymail_service_provider.go b/paymail_service_provider.go index 0035dee1..3ce95cc5 100644 --- a/paymail_service_provider.go +++ b/paymail_service_provider.go @@ -163,6 +163,10 @@ func (p *PaymailDefaultServiceProvider) RecordTransaction(ctx context.Context, rts.(recordIncomingTxStrategy).ForceBroadcast(true) + if p2pTx.Beef != "" { + rts.(recordIncomingTxStrategy).FailOnBroadcastError(true) + } + transaction, err := recordTransaction(ctx, p.client, rts, WithMetadatas(metadata)) if err != nil { return nil, err diff --git a/record_tx.go b/record_tx.go index c51811f8..87914919 100644 --- a/record_tx.go +++ b/record_tx.go @@ -14,6 +14,7 @@ type recordTxStrategy interface { type recordIncomingTxStrategy interface { ForceBroadcast(force bool) + FailOnBroadcastError(forceFail bool) } func recordTransaction(ctx context.Context, c ClientInterface, strategy recordTxStrategy, opts ...ModelOps) (*Transaction, error) { @@ -63,12 +64,12 @@ func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, txHex s if tx != nil { rts = &internalIncomingTx{ Tx: tx, - BroadcastNow: false, + broadcastNow: false, } } else { rts = &externalIncomingTx{ Hex: txHex, - BroadcastNow: false, + broadcastNow: false, } } diff --git a/record_tx_strategy_external_incoming_tx.go b/record_tx_strategy_external_incoming_tx.go index 1a8fc7d3..f1cf500e 100644 --- a/record_tx_strategy_external_incoming_tx.go +++ b/record_tx_strategy_external_incoming_tx.go @@ -9,27 +9,35 @@ import ( ) type externalIncomingTx struct { - Hex string - BroadcastNow bool // e.g. BEEF must be broadcasted now + Hex string + broadcastNow bool // e.g. BEEF must be broadcasted now + allowBroadcastErrors bool // only BEEF cannot allow for broadcast errors } -func (tx *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { +func (strategy *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { logger := c.Logger() // process - if !tx.BroadcastNow && c.IsITCEnabled() { // do not save transaction to database now, save IncomingTransaction instead and let task manager handle and process it - return _addTxToCheck(ctx, tx, c, opts) + if !strategy.broadcastNow && c.IsITCEnabled() { // do not save transaction to database now, save IncomingTransaction instead and let task manager handle and process it + return _addTxToCheck(ctx, strategy, c, opts) } - transaction, err := _createExternalTxToRecord(ctx, tx, c, opts) + transaction, err := _createExternalTxToRecord(ctx, strategy, c, opts) if err != nil { return nil, fmt.Errorf("ExternalIncomingTx.Execute(): creation of external incoming tx failed. Reason: %w", err) } logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start without ITC, TxID: %s", transaction.ID)) - if tx.BroadcastNow || transaction.syncTransaction.BroadcastStatus == SyncStatusReady { - _externalIncomingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcaset in a cron task + if strategy.broadcastNow || transaction.syncTransaction.BroadcastStatus == SyncStatusReady { + + err := _externalIncomingBroadcast(ctx, logger, transaction, strategy.allowBroadcastErrors) + if err != nil { + logger. + Error(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %s, TxID: %s", err, transaction.ID)) + + return nil, fmt.Errorf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %w, TxID: %s", err, transaction.ID) + } } // record @@ -41,21 +49,25 @@ func (tx *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, op return transaction, nil } -func (tx *externalIncomingTx) Validate() error { - if tx.Hex == "" { +func (strategy *externalIncomingTx) Validate() error { + if strategy.Hex == "" { return ErrMissingFieldHex } return nil // is valid } -func (tx *externalIncomingTx) TxID() string { - btTx, _ := bt.NewTxFromString(tx.Hex) +func (strategy *externalIncomingTx) TxID() string { + btTx, _ := bt.NewTxFromString(strategy.Hex) return btTx.TxID() } -func (tx *externalIncomingTx) ForceBroadcast(force bool) { - tx.BroadcastNow = force +func (strategy *externalIncomingTx) ForceBroadcast(force bool) { + strategy.broadcastNow = force +} + +func (strategy *externalIncomingTx) FailOnBroadcastError(forceFail bool) { + strategy.allowBroadcastErrors = !forceFail } func _addTxToCheck(ctx context.Context, tx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) { @@ -117,15 +129,25 @@ func _hydrateExternalWithSync(tx *Transaction) { tx.syncTransaction = sync } -func _externalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction) { +func _externalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction, allowErrors bool) error { logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start broadcast, TxID: %s", tx.ID)) - if err := broadcastSyncTransaction(ctx, tx.syncTransaction); err != nil { - // ignore error, transaction will be broadcaset in a cron task - logger. - Warn(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, tx.ID)) - } else { + err := broadcastSyncTransaction(ctx, tx.syncTransaction) + + if err == nil { logger. Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcast complete, TxID: %s", tx.ID)) + + return nil } + + if allowErrors { + logger. + Warn(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed, next try will be handled by task manager. Reason: %s, TxID: %s", err, tx.ID)) + + // ignore error, transaction will be broadcaset in a cron task + return nil + } + + return err } diff --git a/record_tx_strategy_internal_incoming_tx.go b/record_tx_strategy_internal_incoming_tx.go index 7fedbafb..7e5a3ac8 100644 --- a/record_tx_strategy_internal_incoming_tx.go +++ b/record_tx_strategy_internal_incoming_tx.go @@ -9,69 +9,89 @@ import ( ) type internalIncomingTx struct { - Tx *Transaction - BroadcastNow bool // e.g. BEEF must be broadcasted now + Tx *Transaction + broadcastNow bool // e.g. BEEF must be broadcasted now + allowBroadcastErrors bool // only BEEF cannot allow for broadcast errors } -func (tx *internalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { +func (strategy *internalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { logger := c.Logger() - logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): start, TxID: %s", tx.Tx.ID)) + logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): start, TxID: %s", strategy.Tx.ID)) // process - transaction := tx.Tx + transaction := strategy.Tx syncTx, err := GetSyncTransactionByID(ctx, transaction.ID, transaction.GetOptions(false)...) if err != nil { return nil, fmt.Errorf("InternalIncomingTx.Execute(): getting syncTx failed. Reason: %w", err) } - if tx.BroadcastNow || syncTx.BroadcastStatus == SyncStatusReady { + if strategy.broadcastNow || syncTx.BroadcastStatus == SyncStatusReady { syncTx.transaction = transaction transaction.syncTransaction = syncTx - _internalIncomingBroadcast(ctx, logger, transaction) // ignore broadcast error - will be repeted by task manager + err := _internalIncomingBroadcast(ctx, logger, transaction, strategy.allowBroadcastErrors) + if err != nil { + logger. + Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %s, TxID: %s", err, transaction.ID)) + + return nil, fmt.Errorf("InternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %w, TxID: %s", err, transaction.ID) + } } logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): complete, TxID: %s", transaction.ID)) return transaction, nil } -func (tx *internalIncomingTx) Validate() error { - if tx.Tx == nil { +func (strategy *internalIncomingTx) Validate() error { + if strategy.Tx == nil { return errors.New("Tx cannot be nil") } return nil // is valid } -func (tx *internalIncomingTx) TxID() string { - return tx.Tx.ID +func (strategy *internalIncomingTx) TxID() string { + return strategy.Tx.ID +} + +func (strategy *internalIncomingTx) ForceBroadcast(force bool) { + strategy.broadcastNow = force } -func (tx *internalIncomingTx) ForceBroadcast(force bool) { - tx.BroadcastNow = force +func (strategy *internalIncomingTx) FailOnBroadcastError(forceFail bool) { + strategy.allowBroadcastErrors = !forceFail } -func _internalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, transaction *Transaction) { +func _internalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, transaction *Transaction, allowErrors bool) error { logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): start broadcast, TxID: %s", transaction.ID)) syncTx := transaction.syncTransaction err := broadcastSyncTransaction(ctx, syncTx) - if err != nil { + + if err == nil { + logger. + Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcast complete, TxID: %s", transaction.ID)) + + return nil + } + + if allowErrors { logger. - Warn(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, transaction.ID)) + Warn(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed, next try will be handled by task manager. Reason: %s, TxID: %s", err, transaction.ID)) + // TODO: do I really need this? if syncTx.BroadcastStatus == SyncStatusSkipped { // revert status to ready after fail to re-run broadcasting, this can happen when we received internal BEEF tx syncTx.BroadcastStatus = SyncStatusReady if err = syncTx.Save(ctx); err != nil { logger. - Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): changing synctx.BroadcastStatus from Pending to Ready failed. Reason: %s, TxID: %s", err, transaction.ID)) + Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): changing synctx.BroadcastStatus from Skipped to Ready failed. Reason: %s, TxID: %s", err, transaction.ID)) } } // ignore broadcast error - will be repeted by task manager - } else { - logger. - Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcast complete, TxID: %s", transaction.ID)) + return nil } + + return err } From 33f6587405263288437d7ac751e5890fb561c512 Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Mon, 13 Nov 2023 13:51:07 +0100 Subject: [PATCH 03/10] fix(BUX-250): setup go-datastore logger to log warning level by default --- client_options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client_options.go b/client_options.go index c774a5bc..9c75fbe0 100644 --- a/client_options.go +++ b/client_options.go @@ -68,7 +68,7 @@ func defaultClientOptions() *clientOptions { // Blank Datastore config dataStore: &dataStoreOptions{ ClientInterface: nil, - options: []datastore.ClientOps{}, + options: []datastore.ClientOps{datastore.WithLogger(&datastore.DatabaseLogWrapper{GormLoggerInterface: zLogger.NewGormLogger(false, 4)})}, }, // Default http client From 483fbf721066891979986292f159714515c93333 Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Mon, 13 Nov 2023 14:24:38 +0100 Subject: [PATCH 04/10] fix(BUX-250): fix intenral tx flow;; fix write lock --- paymail_service_provider.go | 6 +-- record_tx.go | 25 ++++++++-- record_tx_strategy_external_incoming_tx.go | 10 ++-- record_tx_strategy_internal_incoming_tx.go | 16 ++---- record_tx_strategy_outgoing_tx.go | 57 ++++++++++++++-------- sync_tx_service.go | 11 +++-- 6 files changed, 77 insertions(+), 48 deletions(-) diff --git a/paymail_service_provider.go b/paymail_service_provider.go index 3ce95cc5..20fd729c 100644 --- a/paymail_service_provider.go +++ b/paymail_service_provider.go @@ -156,15 +156,15 @@ func (p *PaymailDefaultServiceProvider) RecordTransaction(ctx context.Context, metadata[ReferenceIDField] = p2pTx.Reference // Record the transaction - rts, err := getRecordTxStrategy(ctx, p.client, "", p2pTx.Hex, "") + rts, err := getIncomingTxRecordStrategy(ctx, p.client, p2pTx.Hex) if err != nil { return nil, err } - rts.(recordIncomingTxStrategy).ForceBroadcast(true) + rts.ForceBroadcast(true) if p2pTx.Beef != "" { - rts.(recordIncomingTxStrategy).FailOnBroadcastError(true) + rts.FailOnBroadcastError(true) } transaction, err := recordTransaction(ctx, p.client, rts, WithMetadatas(metadata)) diff --git a/record_tx.go b/record_tx.go index 87914919..525051fc 100644 --- a/record_tx.go +++ b/record_tx.go @@ -8,17 +8,19 @@ import ( type recordTxStrategy interface { TxID() string + LockKey() string Validate() error Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) } type recordIncomingTxStrategy interface { + recordTxStrategy ForceBroadcast(force bool) FailOnBroadcastError(forceFail bool) } func recordTransaction(ctx context.Context, c ClientInterface, strategy recordTxStrategy, opts ...ModelOps) (*Transaction, error) { - unlock := waitForRecordTxWriteLock(ctx, c, strategy.TxID()) + unlock := waitForRecordTxWriteLock(ctx, c, strategy.LockKey()) defer unlock() return strategy.Execute(ctx, c, opts) @@ -53,13 +55,13 @@ func getOutgoingTxRecordStrategy(xPubKey, txHex, draftID string) recordTxStrateg } } -func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, txHex string) (recordTxStrategy, error) { +func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, txHex string) (recordIncomingTxStrategy, error) { tx, err := getTransactionByHex(ctx, txHex, c.DefaultModelOptions()...) if err != nil { return nil, err } - var rts recordTxStrategy + var rts recordIncomingTxStrategy if tx != nil { rts = &internalIncomingTx{ @@ -84,15 +86,28 @@ func waitForRecordTxWriteLock(ctx context.Context, c ClientInterface, key string // Create the lock and set the release for after the function completes // Waits for the moment when the transaction is unlocked and creates a new lock // Relevant for bux to bux transactions, as we have 1 tx but need to record 2 txs - outgoing and incoming + + lockKey := fmt.Sprintf(lockKeyRecordTx, key) + + // TODO: change to DEBUG level log when we will support it + c.Logger().Info(ctx, fmt.Sprintf("try add write lock %s", lockKey)) + for { + unlock, err = newWriteLock( - ctx, fmt.Sprintf(lockKeyRecordTx, key), c.Cachestore(), + ctx, lockKey, c.Cachestore(), ) if err == nil { + // TODO: change to DEBUG level log when we will support it + c.Logger().Info(ctx, fmt.Sprintf("added write lock %s", lockKey)) break } time.Sleep(time.Second * 1) } - return unlock + return func() { + // TODO: change to DEBUG level log when we will support it + c.Logger().Info(ctx, fmt.Sprintf("unlock %s", lockKey)) + unlock() + } } diff --git a/record_tx_strategy_external_incoming_tx.go b/record_tx_strategy_external_incoming_tx.go index f1cf500e..3fa50263 100644 --- a/record_tx_strategy_external_incoming_tx.go +++ b/record_tx_strategy_external_incoming_tx.go @@ -36,7 +36,7 @@ func (strategy *externalIncomingTx) Execute(ctx context.Context, c ClientInterfa logger. Error(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %s, TxID: %s", err, transaction.ID)) - return nil, fmt.Errorf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %w, TxID: %s", err, transaction.ID) + return nil, fmt.Errorf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %w", err) } } @@ -62,6 +62,10 @@ func (strategy *externalIncomingTx) TxID() string { return btTx.TxID() } +func (strategy *externalIncomingTx) LockKey() string { + return fmt.Sprintf("incoming-%s", strategy.TxID()) +} + func (strategy *externalIncomingTx) ForceBroadcast(force bool) { strategy.broadcastNow = force } @@ -120,8 +124,8 @@ func _hydrateExternalWithSync(tx *Transaction) { // to simplfy: broadcast every external incoming txs sync.BroadcastStatus = SyncStatusReady - sync.P2PStatus = SyncStatusSkipped // the owner of the Tx should have already notified paymail providers - //sync.SyncStatus = SyncStatusReady + sync.P2PStatus = SyncStatusSkipped // the sender of the Tx should have already notified us + sync.SyncStatus = SyncStatusPending // wait until transaciton will be broadcasted // Use the same metadata sync.Metadata = tx.Metadata diff --git a/record_tx_strategy_internal_incoming_tx.go b/record_tx_strategy_internal_incoming_tx.go index 7e5a3ac8..0945800d 100644 --- a/record_tx_strategy_internal_incoming_tx.go +++ b/record_tx_strategy_internal_incoming_tx.go @@ -21,7 +21,7 @@ func (strategy *internalIncomingTx) Execute(ctx context.Context, c ClientInterfa // process transaction := strategy.Tx syncTx, err := GetSyncTransactionByID(ctx, transaction.ID, transaction.GetOptions(false)...) - if err != nil { + if err != nil || syncTx == nil { return nil, fmt.Errorf("InternalIncomingTx.Execute(): getting syncTx failed. Reason: %w", err) } @@ -54,6 +54,10 @@ func (strategy *internalIncomingTx) TxID() string { return strategy.Tx.ID } +func (strategy *internalIncomingTx) LockKey() string { + return fmt.Sprintf("incoming-%s", strategy.Tx.ID) +} + func (strategy *internalIncomingTx) ForceBroadcast(force bool) { strategy.broadcastNow = force } @@ -79,16 +83,6 @@ func _internalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerIn logger. Warn(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed, next try will be handled by task manager. Reason: %s, TxID: %s", err, transaction.ID)) - // TODO: do I really need this? - if syncTx.BroadcastStatus == SyncStatusSkipped { // revert status to ready after fail to re-run broadcasting, this can happen when we received internal BEEF tx - syncTx.BroadcastStatus = SyncStatusReady - - if err = syncTx.Save(ctx); err != nil { - logger. - Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): changing synctx.BroadcastStatus from Skipped to Ready failed. Reason: %s, TxID: %s", err, transaction.ID)) - } - } - // ignore broadcast error - will be repeted by task manager return nil } diff --git a/record_tx_strategy_outgoing_tx.go b/record_tx_strategy_outgoing_tx.go index b33efe3e..db9f560f 100644 --- a/record_tx_strategy_outgoing_tx.go +++ b/record_tx_strategy_outgoing_tx.go @@ -15,58 +15,73 @@ type outgoingTx struct { XPubKey string } -func (tx *outgoingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { +func (strategy *outgoingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { logger := c.Logger() + logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start, TxID: %s", strategy.TxID())) - // process - transaction, err := _createOutgoingTxToRecord(ctx, tx, c, opts) - - logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start, TxID: %s", transaction.ID)) - + // create + transaction, err := _createOutgoingTxToRecord(ctx, strategy, c, opts) if err != nil { return nil, fmt.Errorf("OutgoingTx.Execute(): creation of outgoing tx failed. Reason: %w", err) } + if err = transaction.Save(ctx); err != nil { + return nil, fmt.Errorf("OutgoingTx.Execute(): saving of Transaction failed. Reason: %w", err) + } + + // process if transaction.syncTransaction.P2PStatus == SyncStatusReady { if err = _outgoingNotifyP2p(ctx, logger, transaction); err != nil { - return nil, err // reject transaction if P2P notification failed + // reject transaction if P2P notification failed + logger.Error(ctx, fmt.Sprintf("OutgoingTx.Execute(): transaction rejected by P2P provider, try to revert transaction. Reason: %s", err)) + + if revertErr := c.RevertTransaction(ctx, transaction.ID); revertErr != nil { + logger.Error(ctx, fmt.Sprintf("OutgoingTx.Execute(): FATAL! Reverting transaction after failed P2P notification failed. Reason: %s", err)) + } + + return nil, err } } - if transaction.syncTransaction.BroadcastStatus == SyncStatusReady { - _outgoingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcasted by cron task + // get newest syncTx from DB - if it's an internal tx it could be broadcasted by us already + syncTx, err := GetSyncTransactionByID(ctx, transaction.ID, transaction.GetOptions(false)...) + if err != nil || syncTx == nil { + return nil, fmt.Errorf("OutgoingTx.Execute(): getting syncTx failed. Reason: %w", err) } - // record - if err = transaction.Save(ctx); err != nil { - return nil, fmt.Errorf("OutgoingTx.Execute(): saving of Transaction failed. Reason: %w", err) + if syncTx.BroadcastStatus == SyncStatusReady { + _outgoingBroadcast(ctx, logger, transaction) // ignore error } logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): complete, TxID: %s", transaction.ID)) return transaction, nil } -func (tx outgoingTx) Validate() error { - if tx.Hex == "" { +func (strategy *outgoingTx) Validate() error { + if strategy.Hex == "" { return ErrMissingFieldHex } - if tx.RelatedDraftID == "" { + if strategy.RelatedDraftID == "" { return errors.New("empty RelatedDraftID") } - if tx.XPubKey == "" { - return errors.New("empty xPubKey") // is it required ? + if strategy.XPubKey == "" { + return errors.New("empty xPubKey") } return nil // is valid } -func (tx outgoingTx) TxID() string { - btTx, _ := bt.NewTxFromString(tx.Hex) +func (strategy *outgoingTx) TxID() string { + btTx, _ := bt.NewTxFromString(strategy.Hex) return btTx.TxID() } +func (strategy *outgoingTx) LockKey() string { + return fmt.Sprintf("outgoing-%s", strategy.TxID()) +} + func _createOutgoingTxToRecord(ctx context.Context, oTx *outgoingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) { // Create NEW transaction model newOpts := c.DefaultModelOptions(append(opts, WithXPub(oTx.XPubKey), New())...) @@ -124,7 +139,7 @@ func _hydrateOutgoingWithSync(tx *Transaction) { // setup synchronization sync.BroadcastStatus = _getBroadcastSyncStatus(tx) sync.P2PStatus = _getP2pSyncStatus(tx) - //sync.SyncStatus = SyncStatusReady + sync.SyncStatus = SyncStatusPending // wait until transaction is broadcasted or P2P provider is notified sync.Metadata = tx.Metadata @@ -186,7 +201,7 @@ func _outgoingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, if err := broadcastSyncTransaction(ctx, tx.syncTransaction); err != nil { // ignore error, transaction will be broadcasted by cron task logger. - Warn(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, tx.ID)) + Warn(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcasting failed, next try will be handled by task manager. Reason: %s, TxID: %s", err, tx.ID)) } else { logger. Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcast complete, TxID: %s", tx.ID)) diff --git a/sync_tx_service.go b/sync_tx_service.go index 5f3da9eb..939e1d86 100644 --- a/sync_tx_service.go +++ b/sync_tx_service.go @@ -194,11 +194,6 @@ func broadcastSyncTransaction(ctx context.Context, syncTx *SyncTransaction) erro StatusMessage: message, }) - // Update the P2P status - if syncTx.P2PStatus == SyncStatusPending { - syncTx.P2PStatus = SyncStatusReady - } - // Update sync status to be ready now if syncTx.SyncStatus == SyncStatusPending { syncTx.SyncStatus = SyncStatusReady @@ -357,6 +352,12 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac // Save the record syncTx.P2PStatus = SyncStatusComplete + + // Update sync status to be ready now + if syncTx.SyncStatus == SyncStatusPending { + syncTx.SyncStatus = SyncStatusReady + } + if err = syncTx.Save(ctx); err != nil { _bailAndSaveSyncTransaction( ctx, syncTx, SyncStatusError, syncActionP2P, "internal", err.Error(), From 35a96f105b255426f7706400d81a664197091046 Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Mon, 13 Nov 2023 14:25:08 +0100 Subject: [PATCH 05/10] fix(BUX-250): remove p2pNotify cron job --- client_options.go | 1 - definitions.go | 1 - model_sync_transactions.go | 26 +++----------------------- sync_tx_service.go | 31 ------------------------------- tasks.go | 12 ------------ 5 files changed, 3 insertions(+), 68 deletions(-) diff --git a/client_options.go b/client_options.go index 9c75fbe0..03a870b4 100644 --- a/client_options.go +++ b/client_options.go @@ -110,7 +110,6 @@ func defaultClientOptions() *clientOptions { ModelDraftTransaction.String() + "_clean_up": taskIntervalDraftCleanup, ModelIncomingTransaction.String() + "_process": taskIntervalProcessIncomingTxs, ModelSyncTransaction.String() + "_" + syncActionBroadcast: taskIntervalSyncActionBroadcast, - ModelSyncTransaction.String() + "_" + syncActionP2P: taskIntervalSyncActionP2P, ModelSyncTransaction.String() + "_" + syncActionSync: taskIntervalSyncActionSync, ModelTransaction.String() + "_" + TransactionActionCheck: taskIntervalTransactionCheck, }, diff --git a/definitions.go b/definitions.go index 73931ccf..0c70e7e5 100644 --- a/definitions.go +++ b/definitions.go @@ -34,7 +34,6 @@ const ( taskIntervalMonitorCheck = defaultMonitorHeartbeat * time.Second // Default task time for cron jobs (seconds) taskIntervalProcessIncomingTxs = 30 * time.Second // Default task time for cron jobs (seconds) taskIntervalSyncActionBroadcast = 30 * time.Second // Default task time for cron jobs (seconds) - taskIntervalSyncActionP2P = 35 * time.Second // Default task time for cron jobs (seconds) taskIntervalSyncActionSync = 40 * time.Second // Default task time for cron jobs (seconds) taskIntervalTransactionCheck = 60 * time.Second // Default task time for cron jobs (seconds) ) diff --git a/model_sync_transactions.go b/model_sync_transactions.go index 0a2c2904..5232c651 100644 --- a/model_sync_transactions.go +++ b/model_sync_transactions.go @@ -143,6 +143,7 @@ func (m *SyncTransaction) RegisterTasks() error { return nil } + // Sync with chain - task // Register the task locally (cron task - set the defaults) syncTask := m.Name() + "_" + syncActionSync ctx := context.Background() @@ -171,6 +172,7 @@ func (m *SyncTransaction) RegisterTasks() error { return err } + // Broadcast - task // Register the task locally (cron task - set the defaults) broadcastTask := m.Name() + "_" + syncActionBroadcast @@ -197,27 +199,5 @@ func (m *SyncTransaction) RegisterTasks() error { return err } - // Register the task locally (cron task - set the defaults) - p2pTask := m.Name() + "_" + syncActionP2P - - // Register the task - if err = tm.RegisterTask(&taskmanager.Task{ - Name: p2pTask, - RetryLimit: 1, - Handler: func(client ClientInterface) error { - if taskErr := taskNotifyP2P(ctx, client.Logger(), WithClient(client)); taskErr != nil { - client.Logger().Error(ctx, "error running "+p2pTask+" task: "+taskErr.Error()) - } - return nil - }, - }); err != nil { - return err - } - - // Run the task periodically - return tm.RunTask(ctx, &taskmanager.TaskOptions{ - Arguments: []interface{}{m.Client()}, - RunEveryPeriod: m.Client().GetTaskPeriod(p2pTask), - TaskName: p2pTask, - }) + return nil } diff --git a/sync_tx_service.go b/sync_tx_service.go index 939e1d86..c11a20c9 100644 --- a/sync_tx_service.go +++ b/sync_tx_service.go @@ -96,37 +96,6 @@ func processBroadcastTransactions(ctx context.Context, maxTransactions int, opts return nil } -// processP2PTransactions will process transactions for p2p notifications -func processP2PTransactions(ctx context.Context, maxTransactions int, opts ...ModelOps) error { - queryParams := &datastore.QueryParams{ - Page: 1, - PageSize: maxTransactions, - OrderByField: "created_at", - SortDirection: "asc", - } - - // Get x records - records, err := getTransactionsToNotifyP2P( - ctx, queryParams, opts..., - ) - if err != nil { - return err - } else if len(records) == 0 { - return nil - } - - // Process the incoming transaction - for index := range records { - if err = processP2PTransaction( - ctx, records[index], nil, - ); err != nil { - return err - } - } - - return nil -} - // broadcastSyncTransaction will broadcast transaction related to syncTx record func broadcastSyncTransaction(ctx context.Context, syncTx *SyncTransaction) error { // Successfully capture any panics, convert to readable string and log the error diff --git a/tasks.go b/tasks.go index dd771f4a..10c29beb 100644 --- a/tasks.go +++ b/tasks.go @@ -79,18 +79,6 @@ func taskBroadcastTransactions(ctx context.Context, logClient zLogger.GormLogger return err } -// taskNotifyP2P will notify any p2p paymail providers -func taskNotifyP2P(ctx context.Context, logClient zLogger.GormLoggerInterface, opts ...ModelOps) error { - - logClient.Info(ctx, "running notify p2p paymail provider(s) task...") - - err := processP2PTransactions(ctx, 10, opts...) - if err == nil || errors.Is(err, datastore.ErrNoResults) { - return nil - } - return err -} - // taskSyncTransactions will sync any transactions func taskSyncTransactions(ctx context.Context, logClient zLogger.GormLoggerInterface, opts ...ModelOps) error { From f8a1f1dbc09c53089a10496feb18eb97c5dec49a Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Tue, 14 Nov 2023 01:17:27 +0100 Subject: [PATCH 06/10] fix(BUX-250): cancel draftTx on revereting transaciton --- action_transaction.go | 7 +++++++ record_tx.go | 1 + 2 files changed, 8 insertions(+) diff --git a/action_transaction.go b/action_transaction.go index 53260c6b..218ca51a 100644 --- a/action_transaction.go +++ b/action_transaction.go @@ -455,6 +455,12 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error { } } + // cancel draft tx + draftTransaction.Status = DraftStatusCanceled + if err = draftTransaction.Save(ctx); err != nil { + return err + } + // cancel sync transaction var syncTransaction *SyncTransaction if syncTransaction, err = GetSyncTransactionByID(ctx, transaction.ID, c.DefaultModelOptions()...); err != nil { @@ -481,6 +487,7 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error { transaction.XpubOutputValue = XpubOutputValue{"reverted": 0} transaction.DeletedAt.Valid = true transaction.DeletedAt.Time = time.Now() + err = transaction.Save(ctx) // update existing record return err diff --git a/record_tx.go b/record_tx.go index 525051fc..46924430 100644 --- a/record_tx.go +++ b/record_tx.go @@ -88,6 +88,7 @@ func waitForRecordTxWriteLock(ctx context.Context, c ClientInterface, key string // Relevant for bux to bux transactions, as we have 1 tx but need to record 2 txs - outgoing and incoming lockKey := fmt.Sprintf(lockKeyRecordTx, key) + c.Logger().Info(ctx, lockKey) // TODO: change to DEBUG level log when we will support it c.Logger().Info(ctx, fmt.Sprintf("try add write lock %s", lockKey)) From eb0818374c1c4bdc47a3e007c81e834761e69a08 Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Tue, 14 Nov 2023 01:56:37 +0100 Subject: [PATCH 07/10] fix(BUX-250): fix options test --- client_options_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client_options_test.go b/client_options_test.go index 31f0c043..6c250bf1 100644 --- a/client_options_test.go +++ b/client_options_test.go @@ -122,7 +122,7 @@ func TestClient_defaultModelOptions(t *testing.T) { require.NotNil(t, dco.dataStore) require.Nil(t, dco.dataStore.ClientInterface) require.NotNil(t, dco.dataStore.options) - assert.Equal(t, 0, len(dco.dataStore.options)) + assert.Equal(t, 1, len(dco.dataStore.options)) require.NotNil(t, dco.newRelic) From 125df49f7766520629ff6f39e2a4c04051de8040 Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Tue, 14 Nov 2023 16:09:09 +0100 Subject: [PATCH 08/10] fix(BUX-250): update go-paymail to 0.7.1 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 8e6744c0..edc2304d 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/99designs/gqlgen v0.17.40 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/bitcoin-sv/go-broadcast-client v0.9.0 - github.com/bitcoin-sv/go-paymail v0.7.0 + github.com/bitcoin-sv/go-paymail v0.7.1 github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 github.com/bitcoinschema/go-map v0.1.0 github.com/centrifugal/centrifuge-go v0.10.2 diff --git a/go.sum b/go.sum index e80950e9..607a204a 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/aws/aws-sdk-go v1.43.45 h1:2708Bj4uV+ym62MOtBnErm/CDX61C4mFe9V2gXy1caE= github.com/bitcoin-sv/go-broadcast-client v0.9.0 h1:6oR1th7TppFtWxcfCHFvEM0XnwTvuO4yTx1tg3rlOIc= github.com/bitcoin-sv/go-broadcast-client v0.9.0/go.mod h1:hami7qUkK0eoZolDXeo4tOLD16qbDXuZRV7BQ1RDzaE= -github.com/bitcoin-sv/go-paymail v0.7.0 h1:QcHsWp+2kgxBsiwM2LGevaNO0PodAk4L5bHABI9flX0= -github.com/bitcoin-sv/go-paymail v0.7.0/go.mod h1:i0mTFBj3hfKEZ1tJUgUfV38b3jJVFgyeIBGR0c9lqOI= +github.com/bitcoin-sv/go-paymail v0.7.1 h1:jrxObr1NZPr1p4R4vn1rSwze9Ceg+hYu3dOBeZbqYnE= +github.com/bitcoin-sv/go-paymail v0.7.1/go.mod h1:i0mTFBj3hfKEZ1tJUgUfV38b3jJVFgyeIBGR0c9lqOI= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5 h1:Sgh5Eb746Zck/46rFDrZZEXZWyO53fMuWYhNoZa1tck= github.com/bitcoinschema/go-bitcoin/v2 v2.0.5/go.mod h1:JjO1ivfZv6vhK0uAXzyH08AAHlzNMAfnyK1Fiv9r4ZA= github.com/bitcoinschema/go-bob v0.4.0 h1:adsAEboLQCg0D6e9vwcJUJEJScszsouAYCYu35UAiGo= From ad47ebe8daffe5296bc54c7e2157825ded1bc12f Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Tue, 14 Nov 2023 16:10:40 +0100 Subject: [PATCH 09/10] fix(BUX-250): change syncTx task interval - run it every 2 minutes --- definitions.go | 2 +- locks.go | 10 +++++----- model_sync_transactions.go | 2 +- sync_tx_service.go | 9 +-------- tasks.go | 14 ++++++++++++-- 5 files changed, 20 insertions(+), 17 deletions(-) diff --git a/definitions.go b/definitions.go index 0c70e7e5..451e1389 100644 --- a/definitions.go +++ b/definitions.go @@ -34,7 +34,7 @@ const ( taskIntervalMonitorCheck = defaultMonitorHeartbeat * time.Second // Default task time for cron jobs (seconds) taskIntervalProcessIncomingTxs = 30 * time.Second // Default task time for cron jobs (seconds) taskIntervalSyncActionBroadcast = 30 * time.Second // Default task time for cron jobs (seconds) - taskIntervalSyncActionSync = 40 * time.Second // Default task time for cron jobs (seconds) + taskIntervalSyncActionSync = 120 * time.Second // Default task time for cron jobs (seconds) taskIntervalTransactionCheck = 60 * time.Second // Default task time for cron jobs (seconds) ) diff --git a/locks.go b/locks.go index 41516c4d..81c434b1 100644 --- a/locks.go +++ b/locks.go @@ -11,11 +11,11 @@ const ( lockKeyProcessBroadcastTx = "process-broadcast-transaction-%s" // + Tx ID lockKeyProcessIncomingTx = "process-incoming-transaction-%s" // + Tx ID lockKeyProcessP2PTx = "process-p2p-transaction-%s" // + Tx ID - lockKeyProcessSyncTx = "process-sync-transaction-%s" // + Tx ID - lockKeyProcessXpub = "action-xpub-id-%s" // + Xpub ID - lockKeyRecordBlockHeader = "action-record-block-header-%s" // + Hash id - lockKeyRecordTx = "action-record-transaction-%s" // + Tx ID - lockKeyReserveUtxo = "utxo-reserve-xpub-id-%s" // + Xpub ID + lockKeyProcessSyncTx = "process-sync-transaction-task" + lockKeyProcessXpub = "action-xpub-id-%s" // + Xpub ID + lockKeyRecordBlockHeader = "action-record-block-header-%s" // + Hash id + lockKeyRecordTx = "action-record-transaction-%s" // + Tx ID + lockKeyReserveUtxo = "utxo-reserve-xpub-id-%s" // + Xpub ID ) // newWriteLock will take care of creating a lock and defer diff --git a/model_sync_transactions.go b/model_sync_transactions.go index 5232c651..6a4b8810 100644 --- a/model_sync_transactions.go +++ b/model_sync_transactions.go @@ -153,7 +153,7 @@ func (m *SyncTransaction) RegisterTasks() error { Name: syncTask, RetryLimit: 1, Handler: func(client ClientInterface) error { - if taskErr := taskSyncTransactions(ctx, client.Logger(), WithClient(client)); taskErr != nil { + if taskErr := taskSyncTransactions(ctx, client, WithClient(client)); taskErr != nil { client.Logger().Error(ctx, "error running "+syncTask+" task: "+taskErr.Error()) } return nil diff --git a/sync_tx_service.go b/sync_tx_service.go index c11a20c9..b7b714db 100644 --- a/sync_tx_service.go +++ b/sync_tx_service.go @@ -189,14 +189,7 @@ func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transact // Successfully capture any panics, convert to readable string and log the error defer recoverAndLog(ctx, syncTx.client.Logger()) - // Create the lock and set the release for after the function completes - unlock, err := newWriteLock( - ctx, fmt.Sprintf(lockKeyProcessSyncTx, syncTx.GetID()), syncTx.Client().Cachestore(), - ) - defer unlock() - if err != nil { - return err - } + var err error // Get the transaction if transaction == nil { diff --git a/tasks.go b/tasks.go index 10c29beb..2ca536f3 100644 --- a/tasks.go +++ b/tasks.go @@ -80,11 +80,21 @@ func taskBroadcastTransactions(ctx context.Context, logClient zLogger.GormLogger } // taskSyncTransactions will sync any transactions -func taskSyncTransactions(ctx context.Context, logClient zLogger.GormLoggerInterface, opts ...ModelOps) error { +func taskSyncTransactions(ctx context.Context, c ClientInterface, opts ...ModelOps) error { + logClient := c.Logger() logClient.Info(ctx, "running sync transaction(s) task...") - err := processSyncTransactions(ctx, 10, opts...) + // Prevent concurrent running + unlock, err := newWriteLock( + ctx, lockKeyProcessSyncTx, c.Cachestore(), + ) + defer unlock() + if err != nil { + return err + } + + err = processSyncTransactions(ctx, 100, opts...) if err == nil || errors.Is(err, datastore.ErrNoResults) { return nil } From 5a222e2c870448d7e328a20d18aa70822595249c Mon Sep 17 00:00:00 2001 From: arkadiuszos4chain Date: Tue, 14 Nov 2023 16:27:44 +0100 Subject: [PATCH 10/10] fix(BUX-250): add warrinng if sync task lasts too long --- tasks.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tasks.go b/tasks.go index 2ca536f3..cd1d5d59 100644 --- a/tasks.go +++ b/tasks.go @@ -91,7 +91,8 @@ func taskSyncTransactions(ctx context.Context, c ClientInterface, opts ...ModelO ) defer unlock() if err != nil { - return err + logClient.Warn(ctx, "cannot run sync transaction(s) task, previous run is not complete yet...") + return nil } err = processSyncTransactions(ctx, 100, opts...)