diff --git a/packages/nodeconn/nc_chain.go b/packages/nodeconn/nc_chain.go index d43c5a219f..7428f85bd0 100644 --- a/packages/nodeconn/nc_chain.go +++ b/packages/nodeconn/nc_chain.go @@ -235,8 +235,8 @@ func (ncc *ncChain) getLastPendingTx() *pendingTransaction { defer ncc.lastPendingTxLock.Unlock() if ncc.lastPendingTx != nil { - ncc.nodeConn.pendingTransactionsLock.Lock() - defer ncc.nodeConn.pendingTransactionsLock.Unlock() + ncc.nodeConn.pendingTransactionsLock.RLock() + defer ncc.nodeConn.pendingTransactionsLock.RUnlock() // check if the transaction is still pending, otherwise reset it if ncc.nodeConn.pendingTransactionsMap.Has(ncc.lastPendingTx.transactionID) { @@ -337,12 +337,17 @@ func (ncc *ncChain) postTxLoop(ctx context.Context) { ncc.LogDebugf("posting transaction %s (chainID: %s, isReattachment: %t%s)...", pendingTx.ID().ToHex(), ncc.chainID, isReattachment, debugInfoChaining) - // we link the ctxAttach to ctxConfirmed and ncChain.ctx. - // this way the proof of work will be canceled if the transaction already got confirmed on L1. - // (e.g. another validator finished PoW and tx was confirmed) + if isReattachment { + // in case it is a reattachment, we have to renew the ctxPublished beforehand, because it may have already been + // canceled by a previous block on L1, which was orphaned. + pendingTx.ctxPublished, pendingTx.cancelCtxPublished = context.WithCancel(pendingTx.ctxConfirmed) + } + + // we link the ctxAttach to ctxPublished and ncChain.ctx. + // this way the proof of work will be canceled if the transaction already got confirmed on L1 or was published by another validator. // the given context will be canceled by the pending transaction checks. // the context will also be canceled if the ncChain.ctx gets canceled by shutdown signal or "Chains.Deactivate". - ctxAttach, cancelCtxAttach := contextutils.MergeContexts(pendingTx.ctxConfirmed, ncc.ctx) + ctxAttach, cancelCtxAttach := contextutils.MergeContexts(pendingTx.ctxPublished, ncc.ctx) defer cancelCtxAttach() ctxAttachWithTimeout, cancelCtxAttachWithTimeout := context.WithTimeout(ctxAttach, inxTimeoutPublishTransaction) diff --git a/packages/nodeconn/nodeconn.go b/packages/nodeconn/nodeconn.go index 7278c52641..135dd836b2 100644 --- a/packages/nodeconn/nodeconn.go +++ b/packages/nodeconn/nodeconn.go @@ -34,16 +34,18 @@ import ( ) const ( - indexerPluginAvailableTimeout = 30 * time.Second - l1NodeSyncWaitTimeout = 2 * time.Minute - inxTimeoutInfo = 500 * time.Millisecond - inxTimeoutBlockMetadata = 500 * time.Millisecond - inxTimeoutSubmitBlock = 60 * time.Second - inxTimeoutPublishTransaction = 120 * time.Second - inxTimeoutIndexerQuery = 2 * time.Second - inxTimeoutMilestone = 2 * time.Second - inxTimeoutOutput = 2 * time.Second - inxTimeoutGetPeers = 2 * time.Second + indexerPluginAvailableTimeout = 30 * time.Second + l1NodeSyncWaitTimeout = 2 * time.Minute + blockMetadataCheckTimeout = 3 * time.Second + blockMetadataCheckCooldownTime = 100 * time.Millisecond + inxTimeoutInfo = 500 * time.Millisecond + inxTimeoutBlockMetadata = 500 * time.Millisecond + inxTimeoutSubmitBlock = 60 * time.Second + inxTimeoutPublishTransaction = 120 * time.Second + inxTimeoutIndexerQuery = 2 * time.Second + inxTimeoutMilestone = 2 * time.Second + inxTimeoutOutput = 2 * time.Second + inxTimeoutGetPeers = 2 * time.Second chainsCleanupThresholdRatio = 50.0 chainsCleanupThresholdCount = 10 @@ -73,7 +75,7 @@ type nodeConnection struct { // pendingTransactionsMap is a map of sent transactions that are pending. pendingTransactionsMap *shrinkingmap.ShrinkingMap[iotago.TransactionID, *pendingTransaction] - pendingTransactionsLock sync.Mutex + pendingTransactionsLock sync.RWMutex reattachWorkerPool *workerpool.WorkerPool shutdownHandler *shutdown.ShutdownHandler @@ -111,7 +113,7 @@ func New( shrinkingmap.WithShrinkingThresholdRatio(pendingTransactionsCleanupThresholdRatio), shrinkingmap.WithShrinkingThresholdCount(pendingTransactionsCleanupThresholdCount), ), - pendingTransactionsLock: sync.Mutex{}, + pendingTransactionsLock: sync.RWMutex{}, shutdownHandler: shutdownHandler, } @@ -293,6 +295,7 @@ func (nc *nodeConnection) Run(ctx context.Context) error { nc.reattachWorkerPool.Start() go nc.subscribeToLedgerUpdates() + go nc.subscribeToBlocks() // mark the node connection as synced nc.syncedCtxCancel() @@ -352,6 +355,20 @@ func (nc *nodeConnection) subscribeToLedgerUpdates() { } } +func (nc *nodeConnection) subscribeToBlocks() { + if err := nc.nodeBridge.ListenToBlocks(nc.ctx, func() {}, nc.handleBlock); err != nil && !errors.Is(err, io.EOF) { + nc.LogError(err) + nc.shutdownHandler.SelfShutdown( + fmt.Sprintf("INX connection unexpected error: %s", err.Error()), + true) + return + } + if nc.ctx.Err() == nil { + // shutdown in case there isn't a shutdown already in progress + nc.shutdownHandler.SelfShutdown("INX connection closed", true) + } +} + func (nc *nodeConnection) getMilestoneTimestamp(ctx context.Context, msIndex iotago.MilestoneIndex) (time.Time, error) { ctx, cancel := newCtxWithTimeout(ctx, inxTimeoutMilestone) defer cancel() @@ -486,6 +503,77 @@ func (nc *nodeConnection) triggerChainCallbacks(ledgerUpdate *ledgerUpdate) erro return nil } +func (nc *nodeConnection) checkReceivedTxPendingAndCancelPoW(block *iotago.Block, txPayload *iotago.Transaction) { + txID, err := txPayload.ID() + if err != nil { + return + } + + nc.pendingTransactionsLock.RLock() + pendingTx, has := nc.pendingTransactionsMap.Get(txID) + if !has { + nc.pendingTransactionsLock.RUnlock() + return + } + nc.pendingTransactionsLock.RUnlock() + + // some chain is waiting for the received tx payload + // => check the quality of the block and cancel ongoing PoW tasks + blockID, err := block.ID() + if err != nil { + return + } + + // asynchronously check the quality of the received block and cancel the PoW if possible + go func() { + ctxWithTimeout, ctxCancel := context.WithTimeout(nc.ctx, blockMetadataCheckTimeout) + defer ctxCancel() + + for ctxWithTimeout.Err() == nil { + ctxMetaWithTimeout, ctxMetaCancel := context.WithTimeout(nc.ctx, inxTimeoutBlockMetadata) + + metadata, err := nc.nodeBridge.BlockMetadata(ctxMetaWithTimeout, blockID) + if err != nil { + // block not found yet => try again + ctxMetaCancel() + + // block not found yet + // => try again after some timeout + time.Sleep(blockMetadataCheckCooldownTime) + continue + } + ctxMetaCancel() + + // => check if the block is solid + if !metadata.Solid { + // block not solid yet + // => try again after some timeout + time.Sleep(blockMetadataCheckCooldownTime) + continue + } + + // check if the block was already referenced + if metadata.ReferencedByMilestoneIndex != 0 { + // block with the tracked tx already got referenced, we can abort attachment of the tx + pendingTx.SetPublished() + break + } + + // block not referenced yet + // => check if the quality of the tips is good or if the block can never be referenced + if metadata.ShouldReattach { + // we can abort the block metadata check, but we should not abort our own attachment of the tx + break + } + + // block is solid and the quality of the tips seem fine + // => abort our own attachment + pendingTx.SetPublished() + break + } + }() +} + type ledgerUpdate struct { milestoneIndex iotago.MilestoneIndex milestoneTimestamp time.Time @@ -554,6 +642,23 @@ func (nc *nodeConnection) handleLedgerUpdate(update *nodebridge.LedgerUpdate) er return nil } +func (nc *nodeConnection) handleBlock(block *iotago.Block) { + if block == nil || block.Payload == nil { + return + } + + // check if the block contains a transaction payload + txPayload, ok := block.Payload.(*iotago.Transaction) + if !ok { + // not a transaction payload + return + } + + // check if the same tx is being tracked in any of the chains, + // and cancel the ongoing PoW if the received tx is attached correctly. + nc.checkReceivedTxPendingAndCancelPoW(block, txPayload) +} + // GetChain returns the chain if it was registered, otherwise it returns an error. func (nc *nodeConnection) GetChain(chainID isc.ChainID) (*ncChain, error) { nc.chainsLock.RLock() @@ -625,8 +730,8 @@ func (nc *nodeConnection) addPendingTransaction(pending *pendingTransaction) { // hasPendingTransaction returns true if a pending transaction exists. func (nc *nodeConnection) hasPendingTransaction(txID iotago.TransactionID) bool { - nc.pendingTransactionsLock.Lock() - defer nc.pendingTransactionsLock.Unlock() + nc.pendingTransactionsLock.RLock() + defer nc.pendingTransactionsLock.RUnlock() return nc.pendingTransactionsMap.Has(txID) } diff --git a/packages/nodeconn/pending_tx.go b/packages/nodeconn/pending_tx.go index 21d9c9cf82..73f3759110 100644 --- a/packages/nodeconn/pending_tx.go +++ b/packages/nodeconn/pending_tx.go @@ -20,6 +20,12 @@ type pendingTransaction struct { // if this context gets canceled, the tx should not be tracked by the node connection anymore. ctxChainConsensus context.Context + // this context is used to signal that the transaction was published to the network (maybe by other validators). + // the metadata of the containing block was fine (solid, no-reattach flag set). + // this context can be used to abort the ongoing PoW in case another validator was faster with publishing the tx. + ctxPublished context.Context + cancelCtxPublished context.CancelFunc + // this context is used to signal that the transaction got referenced by a milestone. // it might be confirmed or conflicting, or the parent ctxChainConsensus got canceled. ctxConfirmed context.Context @@ -60,8 +66,13 @@ func newPendingTransaction(ctxChainConsensus context.Context, ncChain *ncChain, ctxConfirmed, cancelCtxConfirmed := context.WithCancel(ctxChainConsensus) + // if it was confirmed, it was also already published + ctxPublished, cancelCtxPublished := context.WithCancel(ctxConfirmed) + pendingTx := &pendingTransaction{ ctxChainConsensus: ctxChainConsensus, + ctxPublished: ctxPublished, + cancelCtxPublished: cancelCtxPublished, ctxConfirmed: ctxConfirmed, cancelCtxConfirmed: cancelCtxConfirmed, ncChain: ncChain, @@ -135,6 +146,10 @@ func (tx *pendingTransaction) propagateReattach() { } } +func (tx *pendingTransaction) SetPublished() { + tx.cancelCtxPublished() +} + func (tx *pendingTransaction) Confirmed() bool { return tx.confirmed.Load() }