Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

fix(BUX-250): record internal tx #469

Merged
merged 10 commits into from
Nov 15, 2023
7 changes: 7 additions & 0 deletions action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,12 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
}
}

// cancel draft tx
draftTransaction.Status = DraftStatusCanceled
if err = draftTransaction.Save(ctx); err != nil {
return err
}

// cancel sync transaction
var syncTransaction *SyncTransaction
if syncTransaction, err = GetSyncTransactionByID(ctx, transaction.ID, c.DefaultModelOptions()...); err != nil {
Expand All @@ -481,6 +487,7 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error {
transaction.XpubOutputValue = XpubOutputValue{"reverted": 0}
transaction.DeletedAt.Valid = true
transaction.DeletedAt.Time = time.Now()

err = transaction.Save(ctx) // update existing record

return err
Expand Down
3 changes: 1 addition & 2 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func defaultClientOptions() *clientOptions {
// Blank Datastore config
dataStore: &dataStoreOptions{
ClientInterface: nil,
options: []datastore.ClientOps{},
options: []datastore.ClientOps{datastore.WithLogger(&datastore.DatabaseLogWrapper{GormLoggerInterface: zLogger.NewGormLogger(false, 4)})},
},

// Default http client
Expand Down Expand Up @@ -110,7 +110,6 @@ func defaultClientOptions() *clientOptions {
ModelDraftTransaction.String() + "_clean_up": taskIntervalDraftCleanup,
ModelIncomingTransaction.String() + "_process": taskIntervalProcessIncomingTxs,
ModelSyncTransaction.String() + "_" + syncActionBroadcast: taskIntervalSyncActionBroadcast,
ModelSyncTransaction.String() + "_" + syncActionP2P: taskIntervalSyncActionP2P,
ModelSyncTransaction.String() + "_" + syncActionSync: taskIntervalSyncActionSync,
ModelTransaction.String() + "_" + TransactionActionCheck: taskIntervalTransactionCheck,
},
Expand Down
2 changes: 1 addition & 1 deletion client_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestClient_defaultModelOptions(t *testing.T) {
require.NotNil(t, dco.dataStore)
require.Nil(t, dco.dataStore.ClientInterface)
require.NotNil(t, dco.dataStore.options)
assert.Equal(t, 0, len(dco.dataStore.options))
assert.Equal(t, 1, len(dco.dataStore.options))

require.NotNil(t, dco.newRelic)

Expand Down
3 changes: 1 addition & 2 deletions definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ const (
taskIntervalMonitorCheck = defaultMonitorHeartbeat * time.Second // Default task time for cron jobs (seconds)
taskIntervalProcessIncomingTxs = 30 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionBroadcast = 30 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionP2P = 35 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionSync = 40 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionSync = 120 * time.Second // Default task time for cron jobs (seconds)
taskIntervalTransactionCheck = 60 * time.Second // Default task time for cron jobs (seconds)
)

Expand Down
2 changes: 1 addition & 1 deletion go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ const (
lockKeyProcessBroadcastTx = "process-broadcast-transaction-%s" // + Tx ID
lockKeyProcessIncomingTx = "process-incoming-transaction-%s" // + Tx ID
lockKeyProcessP2PTx = "process-p2p-transaction-%s" // + Tx ID
lockKeyProcessSyncTx = "process-sync-transaction-%s" // + Tx ID
lockKeyProcessXpub = "action-xpub-id-%s" // + Xpub ID
lockKeyRecordBlockHeader = "action-record-block-header-%s" // + Hash id
lockKeyRecordTx = "action-record-transaction-%s" // + Tx ID
lockKeyReserveUtxo = "utxo-reserve-xpub-id-%s" // + Xpub ID
lockKeyProcessSyncTx = "process-sync-transaction-task"
lockKeyProcessXpub = "action-xpub-id-%s" // + Xpub ID
lockKeyRecordBlockHeader = "action-record-block-header-%s" // + Hash id
lockKeyRecordTx = "action-record-transaction-%s" // + Tx ID
lockKeyReserveUtxo = "utxo-reserve-xpub-id-%s" // + Xpub ID
)

// newWriteLock will take care of creating a lock and defer
Expand Down
28 changes: 4 additions & 24 deletions model_sync_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (m *SyncTransaction) RegisterTasks() error {
return nil
}

// Sync with chain - task
// Register the task locally (cron task - set the defaults)
syncTask := m.Name() + "_" + syncActionSync
ctx := context.Background()
Expand All @@ -152,7 +153,7 @@ func (m *SyncTransaction) RegisterTasks() error {
Name: syncTask,
RetryLimit: 1,
Handler: func(client ClientInterface) error {
if taskErr := taskSyncTransactions(ctx, client.Logger(), WithClient(client)); taskErr != nil {
if taskErr := taskSyncTransactions(ctx, client, WithClient(client)); taskErr != nil {
client.Logger().Error(ctx, "error running "+syncTask+" task: "+taskErr.Error())
}
return nil
Expand All @@ -171,6 +172,7 @@ func (m *SyncTransaction) RegisterTasks() error {
return err
}

// Broadcast - task
// Register the task locally (cron task - set the defaults)
broadcastTask := m.Name() + "_" + syncActionBroadcast

Expand All @@ -197,27 +199,5 @@ func (m *SyncTransaction) RegisterTasks() error {
return err
}

// Register the task locally (cron task - set the defaults)
p2pTask := m.Name() + "_" + syncActionP2P

// Register the task
if err = tm.RegisterTask(&taskmanager.Task{
Name: p2pTask,
RetryLimit: 1,
Handler: func(client ClientInterface) error {
if taskErr := taskNotifyP2P(ctx, client.Logger(), WithClient(client)); taskErr != nil {
client.Logger().Error(ctx, "error running "+p2pTask+" task: "+taskErr.Error())
}
return nil
},
}); err != nil {
return err
}

// Run the task periodically
return tm.RunTask(ctx, &taskmanager.TaskOptions{
Arguments: []interface{}{m.Client()},
RunEveryPeriod: m.Client().GetTaskPeriod(p2pTask),
TaskName: p2pTask,
})
return nil
}
8 changes: 6 additions & 2 deletions paymail_service_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,16 @@ func (p *PaymailDefaultServiceProvider) RecordTransaction(ctx context.Context,
metadata[ReferenceIDField] = p2pTx.Reference

// Record the transaction
rts, err := getRecordTxStrategy(ctx, p.client, "", p2pTx.Hex, "")
rts, err := getIncomingTxRecordStrategy(ctx, p.client, p2pTx.Hex)
if err != nil {
return nil, err
}

rts.(recordIncomingTxStrategy).ForceBroadcast(true)
rts.ForceBroadcast(true)

if p2pTx.Beef != "" {
rts.FailOnBroadcastError(true)
}

transaction, err := recordTransaction(ctx, p.client, rts, WithMetadatas(metadata))
if err != nil {
Expand Down
31 changes: 24 additions & 7 deletions record_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@ import (

type recordTxStrategy interface {
TxID() string
LockKey() string
Validate() error
Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error)
}

type recordIncomingTxStrategy interface {
recordTxStrategy
ForceBroadcast(force bool)
FailOnBroadcastError(forceFail bool)
}

func recordTransaction(ctx context.Context, c ClientInterface, strategy recordTxStrategy, opts ...ModelOps) (*Transaction, error) {
unlock := waitForRecordTxWriteLock(ctx, c, strategy.TxID())
unlock := waitForRecordTxWriteLock(ctx, c, strategy.LockKey())
defer unlock()

return strategy.Execute(ctx, c, opts)
Expand Down Expand Up @@ -52,23 +55,23 @@ func getOutgoingTxRecordStrategy(xPubKey, txHex, draftID string) recordTxStrateg
}
}

func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, txHex string) (recordTxStrategy, error) {
func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, txHex string) (recordIncomingTxStrategy, error) {
tx, err := getTransactionByHex(ctx, txHex, c.DefaultModelOptions()...)
if err != nil {
return nil, err
}

var rts recordTxStrategy
var rts recordIncomingTxStrategy

if tx != nil {
rts = &internalIncomingTx{
Tx: tx,
BroadcastNow: false,
broadcastNow: false,
}
} else {
rts = &externalIncomingTx{
Hex: txHex,
BroadcastNow: false,
broadcastNow: false,
}
}

Expand All @@ -83,15 +86,29 @@ func waitForRecordTxWriteLock(ctx context.Context, c ClientInterface, key string
// Create the lock and set the release for after the function completes
// Waits for the moment when the transaction is unlocked and creates a new lock
// Relevant for bux to bux transactions, as we have 1 tx but need to record 2 txs - outgoing and incoming

lockKey := fmt.Sprintf(lockKeyRecordTx, key)
c.Logger().Info(ctx, lockKey)

// TODO: change to DEBUG level log when we will support it
c.Logger().Info(ctx, fmt.Sprintf("try add write lock %s", lockKey))

for {

unlock, err = newWriteLock(
ctx, fmt.Sprintf(lockKeyRecordTx, key), c.Cachestore(),
ctx, lockKey, c.Cachestore(),
)
if err == nil {
// TODO: change to DEBUG level log when we will support it
c.Logger().Info(ctx, fmt.Sprintf("added write lock %s", lockKey))
break
}
time.Sleep(time.Second * 1)
}

return unlock
return func() {
// TODO: change to DEBUG level log when we will support it
c.Logger().Info(ctx, fmt.Sprintf("unlock %s", lockKey))
unlock()
}
}
70 changes: 48 additions & 22 deletions record_tx_strategy_external_incoming_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,35 @@ import (
)

type externalIncomingTx struct {
Hex string
BroadcastNow bool // e.g. BEEF must be broadcasted now
Hex string
broadcastNow bool // e.g. BEEF must be broadcasted now
allowBroadcastErrors bool // only BEEF cannot allow for broadcast errors
}

func (tx *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) {
func (strategy *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) {
logger := c.Logger()

// process
if !tx.BroadcastNow && c.IsITCEnabled() { // do not save transaction to database now, save IncomingTransaction instead and let task manager handle and process it
return _addTxToCheck(ctx, tx, c, opts)
if !strategy.broadcastNow && c.IsITCEnabled() { // do not save transaction to database now, save IncomingTransaction instead and let task manager handle and process it
return _addTxToCheck(ctx, strategy, c, opts)
}

transaction, err := _createExternalTxToRecord(ctx, tx, c, opts)
transaction, err := _createExternalTxToRecord(ctx, strategy, c, opts)
if err != nil {
return nil, fmt.Errorf("ExternalIncomingTx.Execute(): creation of external incoming tx failed. Reason: %w", err)
}

logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start without ITC, TxID: %s", transaction.ID))

if transaction.syncTransaction.BroadcastStatus == SyncStatusReady {
_externalIncomingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcaset in a cron task
if strategy.broadcastNow || transaction.syncTransaction.BroadcastStatus == SyncStatusReady {

err := _externalIncomingBroadcast(ctx, logger, transaction, strategy.allowBroadcastErrors)
if err != nil {
logger.
Error(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %s, TxID: %s", err, transaction.ID))

return nil, fmt.Errorf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %w", err)
}
}

// record
Expand All @@ -41,21 +49,29 @@ func (tx *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, op
return transaction, nil
}

func (tx *externalIncomingTx) Validate() error {
if tx.Hex == "" {
func (strategy *externalIncomingTx) Validate() error {
if strategy.Hex == "" {
return ErrMissingFieldHex
}

return nil // is valid
}

func (tx *externalIncomingTx) TxID() string {
btTx, _ := bt.NewTxFromString(tx.Hex)
func (strategy *externalIncomingTx) TxID() string {
btTx, _ := bt.NewTxFromString(strategy.Hex)
return btTx.TxID()
}

func (tx *externalIncomingTx) ForceBroadcast(force bool) {
tx.BroadcastNow = force
func (strategy *externalIncomingTx) LockKey() string {
return fmt.Sprintf("incoming-%s", strategy.TxID())
}

func (strategy *externalIncomingTx) ForceBroadcast(force bool) {
strategy.broadcastNow = force
}

func (strategy *externalIncomingTx) FailOnBroadcastError(forceFail bool) {
strategy.allowBroadcastErrors = !forceFail
}

func _addTxToCheck(ctx context.Context, tx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) {
Expand Down Expand Up @@ -108,24 +124,34 @@ func _hydrateExternalWithSync(tx *Transaction) {
// to simplfy: broadcast every external incoming txs
sync.BroadcastStatus = SyncStatusReady

sync.P2PStatus = SyncStatusSkipped // the owner of the Tx should have already notified paymail providers
//sync.SyncStatus = SyncStatusReady
sync.P2PStatus = SyncStatusSkipped // the sender of the Tx should have already notified us
sync.SyncStatus = SyncStatusPending // wait until transaciton will be broadcasted

// Use the same metadata
sync.Metadata = tx.Metadata
sync.transaction = tx
tx.syncTransaction = sync
}

func _externalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction) {
func _externalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction, allowErrors bool) error {
logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start broadcast, TxID: %s", tx.ID))

if err := broadcastSyncTransaction(ctx, tx.syncTransaction); err != nil {
// ignore error, transaction will be broadcaset in a cron task
logger.
Warn(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, tx.ID))
} else {
err := broadcastSyncTransaction(ctx, tx.syncTransaction)

if err == nil {
logger.
Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcast complete, TxID: %s", tx.ID))

return nil
}

if allowErrors {
logger.
Warn(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed, next try will be handled by task manager. Reason: %s, TxID: %s", err, tx.ID))

// ignore error, transaction will be broadcaset in a cron task
return nil
}

return err
}
Loading