Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #488 from BuxOrg/feat-358-remove-check-tx-job
Browse files Browse the repository at this point in the history
feat(BUX-358): remove check tx job
  • Loading branch information
arkadiuszos4chain authored Dec 1, 2023
2 parents b65bc68 + 48b8d70 commit 5acd0e9
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 289 deletions.
108 changes: 5 additions & 103 deletions action_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,122 +35,24 @@ func (c *Client) RecordTransaction(ctx context.Context, xPubKey, txHex, draftID
return recordTransaction(ctx, c, rts, opts...)
}

// RecordRawTransaction will parse the transaction and save it into the Datastore directly, without any checks
//
// RecordRawTransaction will parse the transaction and save it into the Datastore directly, without any checks or broadcast but bux will ask network for information if transaction was mined
// The transaction is treat as external incoming transaction - transaction without a draft
// Only use this function when you know what you are doing!
//
// txHex is the raw transaction hex
// opts are model options and can include "metadata"
func (c *Client) RecordRawTransaction(ctx context.Context, txHex string,
opts ...ModelOps,
) (*Transaction, error) {
// Check for existing NewRelic transaction
ctx = c.GetOrStartTxn(ctx, "record_raw_transaction")

return c.recordTxHex(ctx, txHex, opts...)
}

// RecordMonitoredTransaction will parse the transaction and save it into the Datastore
//
// This function will try to record the transaction directly, without checking draft ids etc.
//
//nolint:nolintlint,unparam,gci // opts is the way, but not yet being used
func recordMonitoredTransaction(ctx context.Context, client ClientInterface, txHex string,
opts ...ModelOps,
) (*Transaction, error) {
// Check for existing NewRelic transaction
ctx = client.GetOrStartTxn(ctx, "record_monitored_transaction")

transaction, err := client.recordTxHex(ctx, txHex, opts...)
if err != nil {
return nil, err
}

if transaction.BlockHash == "" {
// Create the sync transaction model
sync := newSyncTransaction(
transaction.GetID(),
transaction.Client().DefaultSyncConfig(),
transaction.GetOptions(true)...,
)
sync.BroadcastStatus = SyncStatusSkipped
sync.P2PStatus = SyncStatusSkipped

// Use the same metadata
sync.Metadata = transaction.Metadata

// If all the options are skipped, do not make a new model (ignore the record)
if !sync.isSkipped() {
if err = sync.Save(ctx); err != nil {
return nil, err
}
}
}

return transaction, nil
}

func (c *Client) recordTxHex(ctx context.Context, txHex string, opts ...ModelOps) (*Transaction, error) {
// Create the model & set the default options (gives options from client->model)
newOpts := c.DefaultModelOptions(append(opts, New())...)
transaction := newTransaction(txHex, newOpts...)

// Ensure that we have a transaction id (created from the txHex)
id := transaction.GetID()
if len(id) == 0 {
return nil, ErrMissingTxHex
}

// Create the lock and set the release for after the function completes
unlock, err := newWriteLock(
ctx, fmt.Sprintf(lockKeyRecordTx, id), c.Cachestore(),
)
defer unlock()
if err != nil {
return nil, err
}

// Logic moved from BeforeCreating hook - should be refactorized in next iteration

// If we are external and the user disabled incoming transaction checking, check outputs
if transaction.isExternal() && !transaction.Client().IsITCEnabled() {
// Check that the transaction has >= 1 known destination
if !transaction.TransactionBase.hasOneKnownDestination(ctx, transaction.Client(), transaction.GetOptions(false)...) {
return nil, ErrNoMatchingOutputs
}
}

// Process the UTXOs
if err = transaction.processUtxos(ctx); err != nil {
return nil, err
}

// Set the values from the inputs/outputs and draft tx
transaction.TotalValue, transaction.Fee = transaction.getValues()

// Add values
transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs))
transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs))

// /Logic moved from BeforeCreating hook - should be refactorized in next iteration

allowUnknown := true
monitor := c.options.chainstate.Monitor()

if monitor != nil {
// do not register transactions we have nothing to do with
allowUnknown := monitor.AllowUnknownTransactions()
if transaction.XpubInIDs == nil && transaction.XpubOutIDs == nil && !allowUnknown {
return nil, ErrTransactionUnknown
}
allowUnknown = monitor.AllowUnknownTransactions()
}

// save the transaction model
if err = transaction.Save(ctx); err != nil {
return nil, err
}

// Return the response
return transaction, nil
return saveRawTransaction(ctx, c, allowUnknown, txHex, opts...)
}

// NewTransaction will create a new draft transaction and return it
Expand Down
1 change: 0 additions & 1 deletion client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func defaultClientOptions() *clientOptions {
ModelIncomingTransaction.String() + "_process": taskIntervalProcessIncomingTxs,
ModelSyncTransaction.String() + "_" + syncActionBroadcast: taskIntervalSyncActionBroadcast,
ModelSyncTransaction.String() + "_" + syncActionSync: taskIntervalSyncActionSync,
ModelTransaction.String() + "_" + TransactionActionCheck: taskIntervalTransactionCheck,
},
},

Expand Down
1 change: 0 additions & 1 deletion definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
taskIntervalProcessIncomingTxs = 30 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionBroadcast = 30 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionSync = 120 * time.Second // Default task time for cron jobs (seconds)
taskIntervalTransactionCheck = 60 * time.Second // Default task time for cron jobs (seconds)
)

// All the base models
Expand Down
3 changes: 0 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ var ErrTaskManagerNotLoaded = errors.New("taskmanager must be loaded")
// ErrTransactionNotParsed is when the transaction is not parsed but was expected
var ErrTransactionNotParsed = errors.New("transaction is not parsed")

// ErrTransactionUnknown is when the transaction is not linked to any account in our database
var ErrTransactionUnknown = errors.New("transaction is unknown")

// ErrNoMatchingOutputs is when the transaction does not match any known destinations
var ErrNoMatchingOutputs = errors.New("transaction outputs do not match any known destinations")

Expand Down
1 change: 0 additions & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ type TransactionService interface {
opts ...ModelOps) (*Transaction, error)
RecordRawTransaction(ctx context.Context, txHex string, opts ...ModelOps) (*Transaction, error)
UpdateTransactionMetadata(ctx context.Context, xPubID, id string, metadata Metadata) (*Transaction, error)
recordTxHex(ctx context.Context, txHex string, opts ...ModelOps) (*Transaction, error)
RevertTransaction(ctx context.Context, id string) error
}

Expand Down
6 changes: 0 additions & 6 deletions model_draft_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,12 +1561,6 @@ func initSimpleTestCase(t *testing.T) (context.Context, ClientInterface, func())
transaction := newTransaction(testTxHex, append(client.DefaultModelOptions(), New())...)
err = transaction.processUtxos(ctx)
require.NoError(t, err)
transaction.TotalValue, transaction.Fee = transaction.getValues()

if transaction.TransactionBase.parsedTx != nil {
transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs))
transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs))
}

err = transaction.Save(ctx)
require.NoError(t, err)
Expand Down
27 changes: 1 addition & 26 deletions model_incoming_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,28 +169,7 @@ func (m *IncomingTransaction) BeforeCreating(ctx context.Context) error {
}

// Check that the transaction has >= 1 known destination
if !m.TransactionBase.hasOneKnownDestination(ctx, m.Client(), m.GetOptions(false)...) {
return ErrNoMatchingOutputs
}

// Match a known destination
// todo: this can be optimized searching X records at a time vs loop->query->loop->query
matchingOutput := false
lockingScript := ""
opts := m.GetOptions(false)
for index := range m.TransactionBase.parsedTx.Outputs {
lockingScript = m.TransactionBase.parsedTx.Outputs[index].LockingScript.String()
destination, err := getDestinationWithCache(ctx, m.Client(), "", "", lockingScript, opts...)
if err != nil {
m.Client().Logger().Warn(ctx, "error getting destination: "+err.Error())
} else if destination != nil && destination.LockingScript == lockingScript {
matchingOutput = true
break
}
}

// Does not match any known destination
if !matchingOutput {
if !m.TransactionBase.hasOneKnownDestination(ctx, m.Client()) {
return ErrNoMatchingOutputs
}

Expand Down Expand Up @@ -365,10 +344,6 @@ func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLogge
logClient.Error(ctx, fmt.Sprintf("processIncomingTransaction(): processUtxos() for %s failed. Reason: %s", incomingTx.ID, err))
return err
}

transaction.TotalValue, transaction.Fee = transaction.getValues()
transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs))
transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs))
}

transaction.setChainInfo(txInfo)
Expand Down
6 changes: 0 additions & 6 deletions model_transaction_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,6 @@ const (
ChangeStrategyNominations ChangeStrategy = "nominations"
)

// Types of Transaction actions
const (
// TransactionActionCheck Get on-chain data about the transaction which have height == 0(IE: block hash, height, etc)
TransactionActionCheck = "check"
)

// ScriptOutput is the actual script record (could be several for one output record)
type ScriptOutput struct {
Address string `json:"address,omitempty"` // Hex encoded locking script
Expand Down
51 changes: 12 additions & 39 deletions model_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/libsv/go-bt/v2"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/taskmanager"
"github.com/BuxOrg/bux/utils"
)

Expand Down Expand Up @@ -238,6 +237,10 @@ func (m *Transaction) setBUMP(txInfo *chainstate.TransactionInfo) {
m.BUMP = bump
}

func (m *Transaction) isMined() bool {
return m.BlockHash != ""
}

// IsXpubAssociated will check if this key is associated to this transaction
func (m *Transaction) IsXpubAssociated(rawXpubKey string) bool {
// Hash the raw key
Expand Down Expand Up @@ -302,49 +305,19 @@ func (m *Transaction) Display() interface{} {
// hasOneKnownDestination will check if the transaction has at least one known destination
//
// This is used to validate if an external transaction should be recorded into the engine
func (m *TransactionBase) hasOneKnownDestination(ctx context.Context, client ClientInterface, opts ...ModelOps) bool {
func (m *TransactionBase) hasOneKnownDestination(ctx context.Context, client ClientInterface) bool {
// todo: this can be optimized searching X records at a time vs loop->query->loop->query
lockingScript := ""
for index := range m.parsedTx.Outputs {
lockingScript = m.parsedTx.Outputs[index].LockingScript.String()
destination, err := getDestinationWithCache(ctx, client, "", "", lockingScript, opts...)
for _, output := range m.parsedTx.Outputs {
lockingScript := output.LockingScript.String()
destination, err := getDestinationWithCache(ctx, client, "", "", lockingScript)

if err != nil {
destination = newDestination("", lockingScript, opts...)
destination.Client().Logger().Error(ctx, "error getting destination: "+err.Error())
client.Logger().Error(ctx, "error getting destination: "+err.Error())
continue

} else if destination != nil && destination.LockingScript == lockingScript {
return true
}
}
return false
}

// RegisterTasks will register the model specific tasks on client initialization
func (m *Transaction) RegisterTasks() error {
// No task manager loaded?
tm := m.Client().Taskmanager()
if tm == nil {
return nil
}

ctx := context.Background()
checkTask := m.Name() + "_" + TransactionActionCheck

if err := tm.RegisterTask(&taskmanager.Task{
Name: checkTask,
RetryLimit: 1,
Handler: func(client ClientInterface) error {
if taskErr := taskCheckTransactions(ctx, client.Logger(), WithClient(client)); taskErr != nil {
client.Logger().Error(ctx, "error running "+checkTask+" task: "+taskErr.Error())
}
return nil
},
}); err != nil {
return err
}

return tm.RunTask(ctx, &taskmanager.TaskOptions{
Arguments: []interface{}{m.Client()},
RunEveryPeriod: m.Client().GetTaskPeriod(checkTask),
TaskName: checkTask,
})
}
22 changes: 0 additions & 22 deletions model_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,13 +694,6 @@ func (ts *EmbeddedDBTestSuite) TestTransaction_Save() {
err = transaction.processUtxos(tc.ctx)
require.NoError(t, err)

transaction.TotalValue, transaction.Fee = transaction.getValues()

if transaction.TransactionBase.parsedTx != nil {
transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs))
transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs))
}

err = transaction.Save(tc.ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -757,12 +750,6 @@ func (ts *EmbeddedDBTestSuite) TestTransaction_Save() {
err = transactionIn.processUtxos(tc.ctx)
require.NoError(t, err)

transactionIn.TotalValue, transactionIn.Fee = transactionIn.getValues()

if transactionIn.TransactionBase.parsedTx != nil {
transactionIn.NumberOfInputs = uint32(len(transactionIn.TransactionBase.parsedTx.Inputs))
transactionIn.NumberOfOutputs = uint32(len(transactionIn.TransactionBase.parsedTx.Outputs))
}
err = transactionIn.Save(tc.ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -797,15 +784,6 @@ func (ts *EmbeddedDBTestSuite) TestTransaction_Save() {
err = transaction.processUtxos(tc.ctx)
require.NoError(t, err)

// Set the values from the inputs/outputs and draft tx
transaction.TotalValue, transaction.Fee = transactionIn.getValues()

// Add values if found
if transaction.TransactionBase.parsedTx != nil {
transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs))
transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs))
}

err = transaction.Save(tc.ctx)
require.NoError(t, err)

Expand Down
6 changes: 3 additions & 3 deletions monitor_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (b *blockSubscriptionHandler) OnPublish(subscription *centrifuge.Subscripti
return
}

if _, err = recordMonitoredTransaction(b.ctx, b.buxClient, tx); err != nil {
if _, err = b.buxClient.RecordRawTransaction(b.ctx, tx); err != nil {
// must not override err
btTx, btErr := bt.NewTxFromString(tx)
if btErr != nil {
Expand Down Expand Up @@ -372,7 +372,7 @@ func (h *MonitorEventHandler) processMempoolPublish(_ *centrifuge.Client, e cent
if tx == "" {
return
}
if _, err = recordMonitoredTransaction(h.ctx, h.buxClient, tx); err != nil {
if _, err = h.buxClient.RecordRawTransaction(h.ctx, tx); err != nil {
h.logger.Error(h.ctx, fmt.Sprintf("[MONITOR] ERROR recording tx: %v", err))
return
}
Expand Down Expand Up @@ -450,7 +450,7 @@ func (h *MonitorEventHandler) SetMonitor(monitor *chainstate.Monitor) {

// RecordTransaction records a transaction into bux
func (h *MonitorEventHandler) RecordTransaction(ctx context.Context, txHex string) error {
_, err := recordMonitoredTransaction(ctx, h.buxClient, txHex)
_, err := h.buxClient.RecordRawTransaction(ctx, txHex)
return err
}

Expand Down
8 changes: 1 addition & 7 deletions record_tx_strategy_external_incoming_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,14 @@ func _createExternalTxToRecord(ctx context.Context, eTx *externalIncomingTx, c C
tx := newTransaction(eTx.Hex, c.DefaultModelOptions(append(opts, New())...)...)
_hydrateExternalWithSync(tx)

if !tx.TransactionBase.hasOneKnownDestination(ctx, c, tx.GetOptions(false)...) {
if !tx.TransactionBase.hasOneKnownDestination(ctx, c) {
return nil, ErrNoMatchingOutputs
}

if err := tx.processUtxos(ctx); err != nil {
return nil, err
}

tx.TotalValue, tx.Fee = tx.getValues()
if tx.TransactionBase.parsedTx != nil {
tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs))
tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs))
}

return tx, nil
}

Expand Down
Loading

0 comments on commit 5acd0e9

Please sign in to comment.