Skip to content

Commit

Permalink
Merge pull request #2436 from iotaledger/feat/nodeconn-cancel-pow-if-…
Browse files Browse the repository at this point in the history
…tx-seen

Cancel ongoing PoW if the tx was seen in another block
  • Loading branch information
muXxer authored May 8, 2023
2 parents eba52fd + 11a0545 commit 734c9f0
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 20 deletions.
17 changes: 11 additions & 6 deletions packages/nodeconn/nc_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ func (ncc *ncChain) getLastPendingTx() *pendingTransaction {
defer ncc.lastPendingTxLock.Unlock()

if ncc.lastPendingTx != nil {
ncc.nodeConn.pendingTransactionsLock.Lock()
defer ncc.nodeConn.pendingTransactionsLock.Unlock()
ncc.nodeConn.pendingTransactionsLock.RLock()
defer ncc.nodeConn.pendingTransactionsLock.RUnlock()

// check if the transaction is still pending, otherwise reset it
if ncc.nodeConn.pendingTransactionsMap.Has(ncc.lastPendingTx.transactionID) {
Expand Down Expand Up @@ -337,12 +337,17 @@ func (ncc *ncChain) postTxLoop(ctx context.Context) {

ncc.LogDebugf("posting transaction %s (chainID: %s, isReattachment: %t%s)...", pendingTx.ID().ToHex(), ncc.chainID, isReattachment, debugInfoChaining)

// we link the ctxAttach to ctxConfirmed and ncChain.ctx.
// this way the proof of work will be canceled if the transaction already got confirmed on L1.
// (e.g. another validator finished PoW and tx was confirmed)
if isReattachment {
// in case it is a reattachment, we have to renew the ctxPublished beforehand, because it may have already been
// canceled by a previous block on L1, which was orphaned.
pendingTx.ctxPublished, pendingTx.cancelCtxPublished = context.WithCancel(pendingTx.ctxConfirmed)
}

// we link the ctxAttach to ctxPublished and ncChain.ctx.
// this way the proof of work will be canceled if the transaction already got confirmed on L1 or was published by another validator.
// the given context will be canceled by the pending transaction checks.
// the context will also be canceled if the ncChain.ctx gets canceled by shutdown signal or "Chains.Deactivate".
ctxAttach, cancelCtxAttach := contextutils.MergeContexts(pendingTx.ctxConfirmed, ncc.ctx)
ctxAttach, cancelCtxAttach := contextutils.MergeContexts(pendingTx.ctxPublished, ncc.ctx)
defer cancelCtxAttach()

ctxAttachWithTimeout, cancelCtxAttachWithTimeout := context.WithTimeout(ctxAttach, inxTimeoutPublishTransaction)
Expand Down
133 changes: 119 additions & 14 deletions packages/nodeconn/nodeconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ import (
)

const (
indexerPluginAvailableTimeout = 30 * time.Second
l1NodeSyncWaitTimeout = 2 * time.Minute
inxTimeoutInfo = 500 * time.Millisecond
inxTimeoutBlockMetadata = 500 * time.Millisecond
inxTimeoutSubmitBlock = 60 * time.Second
inxTimeoutPublishTransaction = 120 * time.Second
inxTimeoutIndexerQuery = 2 * time.Second
inxTimeoutMilestone = 2 * time.Second
inxTimeoutOutput = 2 * time.Second
inxTimeoutGetPeers = 2 * time.Second
indexerPluginAvailableTimeout = 30 * time.Second
l1NodeSyncWaitTimeout = 2 * time.Minute
blockMetadataCheckTimeout = 3 * time.Second
blockMetadataCheckCooldownTime = 100 * time.Millisecond
inxTimeoutInfo = 500 * time.Millisecond
inxTimeoutBlockMetadata = 500 * time.Millisecond
inxTimeoutSubmitBlock = 60 * time.Second
inxTimeoutPublishTransaction = 120 * time.Second
inxTimeoutIndexerQuery = 2 * time.Second
inxTimeoutMilestone = 2 * time.Second
inxTimeoutOutput = 2 * time.Second
inxTimeoutGetPeers = 2 * time.Second

chainsCleanupThresholdRatio = 50.0
chainsCleanupThresholdCount = 10
Expand Down Expand Up @@ -73,7 +75,7 @@ type nodeConnection struct {

// pendingTransactionsMap is a map of sent transactions that are pending.
pendingTransactionsMap *shrinkingmap.ShrinkingMap[iotago.TransactionID, *pendingTransaction]
pendingTransactionsLock sync.Mutex
pendingTransactionsLock sync.RWMutex
reattachWorkerPool *workerpool.WorkerPool

shutdownHandler *shutdown.ShutdownHandler
Expand Down Expand Up @@ -111,7 +113,7 @@ func New(
shrinkingmap.WithShrinkingThresholdRatio(pendingTransactionsCleanupThresholdRatio),
shrinkingmap.WithShrinkingThresholdCount(pendingTransactionsCleanupThresholdCount),
),
pendingTransactionsLock: sync.Mutex{},
pendingTransactionsLock: sync.RWMutex{},
shutdownHandler: shutdownHandler,
}

Expand Down Expand Up @@ -293,6 +295,7 @@ func (nc *nodeConnection) Run(ctx context.Context) error {

nc.reattachWorkerPool.Start()
go nc.subscribeToLedgerUpdates()
go nc.subscribeToBlocks()

// mark the node connection as synced
nc.syncedCtxCancel()
Expand Down Expand Up @@ -352,6 +355,20 @@ func (nc *nodeConnection) subscribeToLedgerUpdates() {
}
}

func (nc *nodeConnection) subscribeToBlocks() {
if err := nc.nodeBridge.ListenToBlocks(nc.ctx, func() {}, nc.handleBlock); err != nil && !errors.Is(err, io.EOF) {
nc.LogError(err)
nc.shutdownHandler.SelfShutdown(
fmt.Sprintf("INX connection unexpected error: %s", err.Error()),
true)
return
}
if nc.ctx.Err() == nil {
// shutdown in case there isn't a shutdown already in progress
nc.shutdownHandler.SelfShutdown("INX connection closed", true)
}
}

func (nc *nodeConnection) getMilestoneTimestamp(ctx context.Context, msIndex iotago.MilestoneIndex) (time.Time, error) {
ctx, cancel := newCtxWithTimeout(ctx, inxTimeoutMilestone)
defer cancel()
Expand Down Expand Up @@ -486,6 +503,77 @@ func (nc *nodeConnection) triggerChainCallbacks(ledgerUpdate *ledgerUpdate) erro
return nil
}

func (nc *nodeConnection) checkReceivedTxPendingAndCancelPoW(block *iotago.Block, txPayload *iotago.Transaction) {
txID, err := txPayload.ID()
if err != nil {
return
}

nc.pendingTransactionsLock.RLock()
pendingTx, has := nc.pendingTransactionsMap.Get(txID)
if !has {
nc.pendingTransactionsLock.RUnlock()
return
}
nc.pendingTransactionsLock.RUnlock()

// some chain is waiting for the received tx payload
// => check the quality of the block and cancel ongoing PoW tasks
blockID, err := block.ID()
if err != nil {
return
}

// asynchronously check the quality of the received block and cancel the PoW if possible
go func() {
ctxWithTimeout, ctxCancel := context.WithTimeout(nc.ctx, blockMetadataCheckTimeout)
defer ctxCancel()

for ctxWithTimeout.Err() == nil {
ctxMetaWithTimeout, ctxMetaCancel := context.WithTimeout(nc.ctx, inxTimeoutBlockMetadata)

metadata, err := nc.nodeBridge.BlockMetadata(ctxMetaWithTimeout, blockID)
if err != nil {
// block not found yet => try again
ctxMetaCancel()

// block not found yet
// => try again after some timeout
time.Sleep(blockMetadataCheckCooldownTime)
continue
}
ctxMetaCancel()

// => check if the block is solid
if !metadata.Solid {
// block not solid yet
// => try again after some timeout
time.Sleep(blockMetadataCheckCooldownTime)
continue
}

// check if the block was already referenced
if metadata.ReferencedByMilestoneIndex != 0 {
// block with the tracked tx already got referenced, we can abort attachment of the tx
pendingTx.SetPublished()
break
}

// block not referenced yet
// => check if the quality of the tips is good or if the block can never be referenced
if metadata.ShouldReattach {
// we can abort the block metadata check, but we should not abort our own attachment of the tx
break
}

// block is solid and the quality of the tips seem fine
// => abort our own attachment
pendingTx.SetPublished()
break
}
}()
}

type ledgerUpdate struct {
milestoneIndex iotago.MilestoneIndex
milestoneTimestamp time.Time
Expand Down Expand Up @@ -554,6 +642,23 @@ func (nc *nodeConnection) handleLedgerUpdate(update *nodebridge.LedgerUpdate) er
return nil
}

func (nc *nodeConnection) handleBlock(block *iotago.Block) {
if block == nil || block.Payload == nil {
return
}

// check if the block contains a transaction payload
txPayload, ok := block.Payload.(*iotago.Transaction)
if !ok {
// not a transaction payload
return
}

// check if the same tx is being tracked in any of the chains,
// and cancel the ongoing PoW if the received tx is attached correctly.
nc.checkReceivedTxPendingAndCancelPoW(block, txPayload)
}

// GetChain returns the chain if it was registered, otherwise it returns an error.
func (nc *nodeConnection) GetChain(chainID isc.ChainID) (*ncChain, error) {
nc.chainsLock.RLock()
Expand Down Expand Up @@ -625,8 +730,8 @@ func (nc *nodeConnection) addPendingTransaction(pending *pendingTransaction) {

// hasPendingTransaction returns true if a pending transaction exists.
func (nc *nodeConnection) hasPendingTransaction(txID iotago.TransactionID) bool {
nc.pendingTransactionsLock.Lock()
defer nc.pendingTransactionsLock.Unlock()
nc.pendingTransactionsLock.RLock()
defer nc.pendingTransactionsLock.RUnlock()

return nc.pendingTransactionsMap.Has(txID)
}
Expand Down
15 changes: 15 additions & 0 deletions packages/nodeconn/pending_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ type pendingTransaction struct {
// if this context gets canceled, the tx should not be tracked by the node connection anymore.
ctxChainConsensus context.Context

// this context is used to signal that the transaction was published to the network (maybe by other validators).
// the metadata of the containing block was fine (solid, no-reattach flag set).
// this context can be used to abort the ongoing PoW in case another validator was faster with publishing the tx.
ctxPublished context.Context
cancelCtxPublished context.CancelFunc

// this context is used to signal that the transaction got referenced by a milestone.
// it might be confirmed or conflicting, or the parent ctxChainConsensus got canceled.
ctxConfirmed context.Context
Expand Down Expand Up @@ -60,8 +66,13 @@ func newPendingTransaction(ctxChainConsensus context.Context, ncChain *ncChain,

ctxConfirmed, cancelCtxConfirmed := context.WithCancel(ctxChainConsensus)

// if it was confirmed, it was also already published
ctxPublished, cancelCtxPublished := context.WithCancel(ctxConfirmed)

pendingTx := &pendingTransaction{
ctxChainConsensus: ctxChainConsensus,
ctxPublished: ctxPublished,
cancelCtxPublished: cancelCtxPublished,
ctxConfirmed: ctxConfirmed,
cancelCtxConfirmed: cancelCtxConfirmed,
ncChain: ncChain,
Expand Down Expand Up @@ -135,6 +146,10 @@ func (tx *pendingTransaction) propagateReattach() {
}
}

func (tx *pendingTransaction) SetPublished() {
tx.cancelCtxPublished()
}

func (tx *pendingTransaction) Confirmed() bool {
return tx.confirmed.Load()
}
Expand Down

0 comments on commit 734c9f0

Please sign in to comment.