diff --git a/pkg/chain/local_v1/blockcounter.go b/pkg/chain/local_v1/blockcounter.go index 0a99e9b815..69939b29f0 100644 --- a/pkg/chain/local_v1/blockcounter.go +++ b/pkg/chain/local_v1/blockcounter.go @@ -20,7 +20,7 @@ type watcher struct { channel chan uint64 } -var blockTime = time.Duration(500 * time.Millisecond) +var defaultBlockTime = 500 * time.Millisecond func (lbc *localBlockCounter) WaitForBlockHeight(blockNumber uint64) error { waiter, err := lbc.BlockHeightWaiter(blockNumber) @@ -88,8 +88,16 @@ func (lbc *localBlockCounter) WatchBlocks(ctx context.Context) <-chan uint64 { // count is an internal function that counts up time to simulate the generation // of blocks. -func (lbc *localBlockCounter) count() { - ticker := time.NewTicker(blockTime) +func (lbc *localBlockCounter) count(blockTime ...time.Duration) { + var resolvedBlockTime time.Duration + switch len(blockTime) { + case 1: + resolvedBlockTime = blockTime[0] + default: + resolvedBlockTime = defaultBlockTime + } + + ticker := time.NewTicker(resolvedBlockTime) for range ticker.C { lbc.structMutex.Lock() @@ -127,10 +135,10 @@ func (lbc *localBlockCounter) count() { // BlockCounter creates a BlockCounter that runs completely locally. It is // designed to simply increase block height at a set time interval in the // background. -func BlockCounter() (chain.BlockCounter, error) { +func BlockCounter(blockTime ...time.Duration) (chain.BlockCounter, error) { counter := localBlockCounter{blockHeight: 0, waiters: make(map[uint64][]chan uint64)} - go counter.count() + go counter.count(blockTime...) return &counter, nil } diff --git a/pkg/chain/local_v1/local_test.go b/pkg/chain/local_v1/local_test.go index 76f8ed992b..f061f9a6e8 100644 --- a/pkg/chain/local_v1/local_test.go +++ b/pkg/chain/local_v1/local_test.go @@ -339,13 +339,13 @@ func TestLocalBlockHeightWaiter(t *testing.T) { }, "returns immediately for block height already reached": { blockHeight: 2, - initialDelay: 3 * blockTime, + initialDelay: 3 * defaultBlockTime, expectedWaitTime: 0, }, "waits for block height not yet reached": { blockHeight: 5, - initialDelay: 2 * blockTime, - expectedWaitTime: 3 * blockTime, + initialDelay: 2 * defaultBlockTime, + expectedWaitTime: 3 * defaultBlockTime, }, } diff --git a/pkg/tbtc/chain.go b/pkg/tbtc/chain.go index 533b5ab925..f6aa241b1c 100644 --- a/pkg/tbtc/chain.go +++ b/pkg/tbtc/chain.go @@ -363,6 +363,8 @@ type WalletCoordinatorChain interface { // HeartbeatRequestSubmittedEvent represents a wallet heartbeat request // submitted to the chain. +// +// TODO: Remove this type and all related code. type HeartbeatRequestSubmittedEvent struct { WalletPublicKeyHash [20]byte Message []byte @@ -370,19 +372,10 @@ type HeartbeatRequestSubmittedEvent struct { BlockNumber uint64 } -// DepositSweepProposal represents a deposit sweep proposal submitted to the chain. -type DepositSweepProposal struct { - WalletPublicKeyHash [20]byte - DepositsKeys []struct { - FundingTxHash bitcoin.Hash - FundingOutputIndex uint32 - } - SweepTxFee *big.Int - DepositsRevealBlocks []*big.Int -} - // DepositSweepProposalSubmittedEvent represents a deposit sweep proposal // submission event. +// +// TODO: Remove this type and all related code. type DepositSweepProposalSubmittedEvent struct { Proposal *DepositSweepProposal Coordinator chain.Address @@ -404,6 +397,8 @@ type DepositSweepProposalSubmittedEventFilter struct { // RedemptionProposalSubmittedEvent represents a redemption proposal // submission event. +// +// TODO: Remove this type and all related code. type RedemptionProposalSubmittedEvent struct { Proposal *RedemptionProposal Coordinator chain.Address @@ -423,13 +418,6 @@ type RedemptionProposalSubmittedEventFilter struct { WalletPublicKeyHash [20]byte } -// RedemptionProposal represents a redemption proposal submitted to the chain. -type RedemptionProposal struct { - WalletPublicKeyHash [20]byte - RedeemersOutputScripts []bitcoin.Script - RedemptionTxFee *big.Int -} - // RedemptionRequestedEvent represents a redemption requested event. type RedemptionRequestedEvent struct { WalletPublicKeyHash [20]byte diff --git a/pkg/tbtc/chain_test.go b/pkg/tbtc/chain_test.go index 4035ec7f16..45d60b830c 100644 --- a/pkg/tbtc/chain_test.go +++ b/pkg/tbtc/chain_test.go @@ -794,19 +794,22 @@ func buildRedemptionProposalValidationKey( } // Connect sets up the local chain. -func Connect() *localChain { +func Connect(blockTime ...time.Duration) *localChain { operatorPrivateKey, _, err := operator.GenerateKeyPair(local_v1.DefaultCurve) if err != nil { panic(err) } - return ConnectWithKey(operatorPrivateKey) + return ConnectWithKey(operatorPrivateKey, blockTime...) } // ConnectWithKey sets up the local chain using the provided operator private // key. -func ConnectWithKey(operatorPrivateKey *operator.PrivateKey) *localChain { - blockCounter, _ := local_v1.BlockCounter() +func ConnectWithKey( + operatorPrivateKey *operator.PrivateKey, + blockTime ...time.Duration, +) *localChain { + blockCounter, _ := local_v1.BlockCounter(blockTime...) localChain := &localChain{ dkgResultSubmissionHandlers: make( diff --git a/pkg/tbtc/coordination.go b/pkg/tbtc/coordination.go new file mode 100644 index 0000000000..4c1859499e --- /dev/null +++ b/pkg/tbtc/coordination.go @@ -0,0 +1,257 @@ +package tbtc + +import ( + "context" + "fmt" + "github.com/keep-network/keep-core/pkg/chain" + "github.com/keep-network/keep-core/pkg/generator" + "github.com/keep-network/keep-core/pkg/net" + "github.com/keep-network/keep-core/pkg/protocol/group" + "golang.org/x/sync/semaphore" +) + +const ( + // coordinationFrequencyBlocks is the number of blocks between two + // consecutive coordination windows. + coordinationFrequencyBlocks = 900 + // coordinationActivePhaseDurationBlocks is the number of blocks in the + // active phase of the coordination window. The active phase is the + // phase during which the communication between the coordination leader and + // their followers is allowed. + coordinationActivePhaseDurationBlocks = 80 + // coordinationPassivePhaseDurationBlocks is the number of blocks in the + // passive phase of the coordination window. The passive phase is the + // phase during which communication is not allowed. Participants are + // expected to validate the result of the coordination and prepare for + // execution of the proposed wallet action. + coordinationPassivePhaseDurationBlocks = 20 + // coordinationDurationBlocks is the number of blocks in a single + // coordination window. + coordinationDurationBlocks = coordinationActivePhaseDurationBlocks + + coordinationPassivePhaseDurationBlocks +) + +// errCoordinationExecutorBusy is an error returned when the coordination +// executor cannot execute the requested coordination due to an ongoing one. +var errCoordinationExecutorBusy = fmt.Errorf("coordination executor is busy") + +// coordinationWindow represents a single coordination window. The coordination +// block is the first block of the window. +type coordinationWindow struct { + // coordinationBlock is the first block of the coordination window. + coordinationBlock uint64 +} + +// newCoordinationWindow creates a new coordination window for the given +// coordination block. +func newCoordinationWindow(coordinationBlock uint64) *coordinationWindow { + return &coordinationWindow{ + coordinationBlock: coordinationBlock, + } +} + +// ActivePhaseEndBlock returns the block number at which the active phase +// of the coordination window ends. +func (cw *coordinationWindow) activePhaseEndBlock() uint64 { + return cw.coordinationBlock + coordinationActivePhaseDurationBlocks +} + +// EndBlock returns the block number at which the coordination window ends. +func (cw *coordinationWindow) endBlock() uint64 { + return cw.coordinationBlock + coordinationDurationBlocks +} + +// isAfter returns true if this coordination window is after the other +// window. +func (cw *coordinationWindow) isAfter(other *coordinationWindow) bool { + if other == nil { + return true + } + + return cw.coordinationBlock > other.coordinationBlock +} + +// watchCoordinationWindows watches for new coordination windows and runs +// the given callback when a new window is detected. The callback is run +// in a separate goroutine. It is guaranteed that the callback is not run +// twice for the same window. The context passed as the first parameter +// is used to cancel the watch. +func watchCoordinationWindows( + ctx context.Context, + watchBlocksFn func(ctx context.Context) <-chan uint64, + onWindowFn func(window *coordinationWindow), +) { + blocksChan := watchBlocksFn(ctx) + var lastWindow *coordinationWindow + + for { + select { + case block := <-blocksChan: + if block%coordinationFrequencyBlocks == 0 { + // Make sure the current window is not the same as the last one. + // There is no guarantee that the block channel will not emit + // the same block again. + if window := newCoordinationWindow(block); window.isAfter(lastWindow) { + lastWindow = window + // Run the callback in a separate goroutine to avoid blocking + // this loop and potentially missing the next block. + go onWindowFn(window) + } + } + case <-ctx.Done(): + return + } + } +} + +// CoordinationFaultType represents a type of the coordination fault. +type CoordinationFaultType uint8 + +const ( + // FaultUnknown is a fault type used when the fault type is unknown. + FaultUnknown CoordinationFaultType = iota + // FaultLeaderIdleness is a fault type used when the leader was idle, i.e. + // missed their turn to propose a wallet action. + FaultLeaderIdleness + // FaultLeaderMistake is a fault type used when the leader's proposal + // turned out to be invalid. + FaultLeaderMistake + // FaultLeaderImpersonation is a fault type used when the leader was + // impersonated by another operator who raised their own proposal. + FaultLeaderImpersonation +) + +func (cft CoordinationFaultType) String() string { + switch cft { + case FaultUnknown: + return "Unknown" + case FaultLeaderIdleness: + return "LeaderIdleness" + case FaultLeaderMistake: + return "FaultLeaderMistake" + case FaultLeaderImpersonation: + return "LeaderImpersonation" + default: + panic("unknown coordination fault type") + } +} + +// coordinationFault represents a single coordination fault. +type coordinationFault struct { + // culprit is the address of the operator that is responsible for the fault. + culprit chain.Address + // faultType is the type of the fault. + faultType CoordinationFaultType +} + +func (cf *coordinationFault) String() string { + return fmt.Sprintf( + "operator [%s], fault [%s]", + cf.culprit, + cf.faultType, + ) +} + +// coordinationProposal represents a single action proposal for the given wallet. +type coordinationProposal interface { + // actionType returns the specific type of the walletAction being subject + // of this proposal. + actionType() WalletActionType + // validityBlocks returns the number of blocks for which the proposal is + // valid. + validityBlocks() uint64 +} + +// noopProposal is a proposal that does not propose any action. +type noopProposal struct{} + +func (np *noopProposal) actionType() WalletActionType { + return ActionNoop +} + +func (np *noopProposal) validityBlocks() uint64 { + // Panic to make sure that the proposal is not processed by the node. + panic("noop proposal does not have validity blocks") +} + +// coordinationResult represents the result of the coordination procedure +// executed for the given wallet in the given coordination window. +type coordinationResult struct { + wallet wallet + window *coordinationWindow + leader chain.Address + proposal coordinationProposal + faults []*coordinationFault +} + +func (cr *coordinationResult) String() string { + return fmt.Sprintf( + "wallet [%s], window [%v], leader [%s], proposal [%s], faults [%s]", + &cr.wallet, + cr.window.coordinationBlock, + cr.leader, + cr.proposal.actionType(), + cr.faults, + ) +} + +// coordinationExecutor is responsible for executing the coordination +// procedure for the given wallet. +type coordinationExecutor struct { + lock *semaphore.Weighted + + signers []*signer // TODO: Do we need whole signers? + broadcastChannel net.BroadcastChannel + membershipValidator *group.MembershipValidator + protocolLatch *generator.ProtocolLatch +} + +// newCoordinationExecutor creates a new coordination executor for the +// given wallet. +func newCoordinationExecutor( + signers []*signer, + broadcastChannel net.BroadcastChannel, + membershipValidator *group.MembershipValidator, + protocolLatch *generator.ProtocolLatch, +) *coordinationExecutor { + return &coordinationExecutor{ + lock: semaphore.NewWeighted(1), + signers: signers, + broadcastChannel: broadcastChannel, + membershipValidator: membershipValidator, + protocolLatch: protocolLatch, + } +} + +// wallet returns the wallet this executor is responsible for. +func (ce *coordinationExecutor) wallet() wallet { + // All signers belong to one wallet. Take that wallet from the + // first signer. + return ce.signers[0].wallet +} + +// coordinate executes the coordination procedure for the given coordination +// window. +func (ce *coordinationExecutor) coordinate( + window *coordinationWindow, +) (*coordinationResult, error) { + if lockAcquired := ce.lock.TryAcquire(1); !lockAcquired { + return nil, errCoordinationExecutorBusy + } + defer ce.lock.Release(1) + + // TODO: Implement coordination logic. Remember about: + // - Setting up the right context + // - Using the protocol latch + // - Using the membership validator + // Example result: + result := &coordinationResult{ + wallet: ce.wallet(), + window: window, + leader: ce.wallet().signingGroupOperators[0], + proposal: &noopProposal{}, + faults: nil, + } + + return result, nil +} diff --git a/pkg/tbtc/coordination_test.go b/pkg/tbtc/coordination_test.go new file mode 100644 index 0000000000..b62a3d7ffe --- /dev/null +++ b/pkg/tbtc/coordination_test.go @@ -0,0 +1,118 @@ +package tbtc + +import ( + "context" + "testing" + "time" + + "github.com/keep-network/keep-core/internal/testutils" +) + +func TestCoordinationWindow_ActivePhaseEndBlock(t *testing.T) { + window := newCoordinationWindow(900) + + testutils.AssertIntsEqual( + t, + "active phase end block", + 980, + int(window.activePhaseEndBlock()), + ) +} + +func TestCoordinationWindow_EndBlock(t *testing.T) { + window := newCoordinationWindow(900) + + testutils.AssertIntsEqual( + t, + "end block", + 1000, + int(window.endBlock()), + ) +} + +func TestCoordinationWindow_IsAfter(t *testing.T) { + window := newCoordinationWindow(1800) + + previousWindow := newCoordinationWindow(900) + sameWindow := newCoordinationWindow(1800) + nextWindow := newCoordinationWindow(2700) + + testutils.AssertBoolsEqual( + t, + "result for nil", + true, + window.isAfter(nil), + ) + testutils.AssertBoolsEqual( + t, + "result for previous window", + true, + window.isAfter(previousWindow), + ) + testutils.AssertBoolsEqual( + t, + "result for same window", + false, + window.isAfter(sameWindow), + ) + testutils.AssertBoolsEqual( + t, + "result for next window", + false, + window.isAfter(nextWindow), + ) +} + +func TestWatchCoordinationWindows(t *testing.T) { + watchBlocksFn := func(ctx context.Context) <-chan uint64 { + blocksChan := make(chan uint64) + + go func() { + ticker := time.NewTicker(1 * time.Millisecond) + defer ticker.Stop() + + block := uint64(0) + + for { + select { + case <-ticker.C: + block++ + blocksChan <- block + case <-ctx.Done(): + return + } + } + }() + + return blocksChan + } + + receivedWindows := make([]*coordinationWindow, 0) + onWindowFn := func(window *coordinationWindow) { + receivedWindows = append(receivedWindows, window) + } + + ctx, cancelCtx := context.WithTimeout( + context.Background(), + 2000*time.Millisecond, + ) + defer cancelCtx() + + go watchCoordinationWindows(ctx, watchBlocksFn, onWindowFn) + + <-ctx.Done() + + testutils.AssertIntsEqual(t, "received windows", 2, len(receivedWindows)) + testutils.AssertIntsEqual( + t, + "first window", + 900, + int(receivedWindows[0].coordinationBlock), + ) + testutils.AssertIntsEqual( + t, + "second window", + 1800, + int(receivedWindows[1].coordinationBlock), + ) +} diff --git a/pkg/tbtc/deposit_sweep.go b/pkg/tbtc/deposit_sweep.go index 902ca3722d..8a1ad2792a 100644 --- a/pkg/tbtc/deposit_sweep.go +++ b/pkg/tbtc/deposit_sweep.go @@ -3,6 +3,7 @@ package tbtc import ( "crypto/ecdsa" "fmt" + "math/big" "time" "github.com/ipfs/go-log/v2" @@ -12,6 +13,12 @@ import ( ) const ( + // depositSweepProposalValidityBlocks determines the deposit sweep proposal + // validity time expressed in blocks. In other words, this is the worst-case + // time for a deposit sweep during which the wallet is busy and cannot take + // another actions. The value of 1200 blocks is roughly 4 hours, assuming + // 12 seconds per block. + depositSweepProposalValidityBlocks = 1200 // depositSweepProposalConfirmationBlocks determines the block length of the // confirmation period on the host chain that is preserved after a deposit // sweep proposal submission. @@ -47,6 +54,26 @@ const ( depositSweepBroadcastCheckDelay = 1 * time.Minute ) +// DepositSweepProposal represents a deposit sweep proposal issued by a +// wallet's coordination leader. +type DepositSweepProposal struct { + WalletPublicKeyHash [20]byte + DepositsKeys []struct { + FundingTxHash bitcoin.Hash + FundingOutputIndex uint32 + } + SweepTxFee *big.Int + DepositsRevealBlocks []*big.Int +} + +func (dsp *DepositSweepProposal) actionType() WalletActionType { + return ActionDepositSweep +} + +func (dsp *DepositSweepProposal) validityBlocks() uint64 { + return depositSweepProposalValidityBlocks +} + // depositSweepAction is a deposit sweep walletAction. type depositSweepAction struct { logger *zap.SugaredLogger diff --git a/pkg/tbtc/heartbeat.go b/pkg/tbtc/heartbeat.go index a26c960919..09f704d178 100644 --- a/pkg/tbtc/heartbeat.go +++ b/pkg/tbtc/heartbeat.go @@ -12,11 +12,16 @@ import ( ) const ( + // heartbeatProposalValidityBlocks determines the wallet heartbeat proposal + // validity time expressed in blocks. In other words, this is the worst-case + // time for a wallet heartbeat during which the wallet is busy and cannot + // take another actions. The value of 300 blocks is roughly 1 hour, assuming + // 12 seconds per block. + heartbeatProposalValidityBlocks = 300 // heartbeatRequestConfirmationBlocks determines the block length of the // confirmation period on the host chain that is preserved after a heartbeat // request submission. heartbeatRequestConfirmationBlocks = 3 - // heartbeatRequestTimeoutSafetyMargin determines the duration of the // safety margin that must be preserved between the signing timeout // and the timeout of the entire heartbeat action. This safety @@ -25,6 +30,18 @@ const ( heartbeatRequestTimeoutSafetyMargin = 5 * time.Minute ) +type HeartbeatProposal struct { + // TODO: Proposal fields. +} + +func (hp *HeartbeatProposal) actionType() WalletActionType { + return ActionHeartbeat +} + +func (hp *HeartbeatProposal) validityBlocks() uint64 { + return heartbeatProposalValidityBlocks +} + // heartbeatSigningExecutor is an interface meant to decouple the specific // implementation of the signing executor from the heartbeat action. type heartbeatSigningExecutor interface { diff --git a/pkg/tbtc/node.go b/pkg/tbtc/node.go index fe7b92d3d9..48303a62ca 100644 --- a/pkg/tbtc/node.go +++ b/pkg/tbtc/node.go @@ -76,6 +76,15 @@ type node struct { // wallet actions and walletDispatcher to execute an action on an existing // wallet. signingExecutors map[string]*signingExecutor + + coordinationExecutorsMutex sync.Mutex + // coordinationExecutors is the cache holding coordination executors for + // specific wallets. The cache key is the uncompressed public key + // (with 04 prefix) of the wallet. The coordinationExecutor encapsulates the + // logic of the wallet coordination procedure. + // + // coordinationExecutors MUST NOT be used outside this struct. + coordinationExecutors map[string]*coordinationExecutor } func newNode( @@ -94,14 +103,15 @@ func newNode( scheduler.RegisterProtocol(latch) node := &node{ - groupParameters: groupParameters, - chain: chain, - btcChain: btcChain, - netProvider: netProvider, - walletRegistry: walletRegistry, - walletDispatcher: newWalletDispatcher(), - protocolLatch: latch, - signingExecutors: make(map[string]*signingExecutor), + groupParameters: groupParameters, + chain: chain, + btcChain: btcChain, + netProvider: netProvider, + walletRegistry: walletRegistry, + walletDispatcher: newWalletDispatcher(), + protocolLatch: latch, + signingExecutors: make(map[string]*signingExecutor), + coordinationExecutors: make(map[string]*coordinationExecutor), } // Only the operator address is known at this point and can be pre-fetched. @@ -291,6 +301,87 @@ func (n *node) getSigningExecutor( return executor, true, nil } +// getCoordinationExecutor gets the coordination executor responsible for +// executing coordination related to a specific wallet whose part is controlled +// by this node. The second boolean return value indicates whether the node +// controls at least one signer for the given wallet. +func (n *node) getCoordinationExecutor( + walletPublicKey *ecdsa.PublicKey, +) (*coordinationExecutor, bool, error) { + n.coordinationExecutorsMutex.Lock() + defer n.coordinationExecutorsMutex.Unlock() + + walletPublicKeyBytes, err := marshalPublicKey(walletPublicKey) + if err != nil { + return nil, false, fmt.Errorf("cannot marshal wallet public key: [%v]", err) + } + + executorKey := hex.EncodeToString(walletPublicKeyBytes) + + if executor, exists := n.coordinationExecutors[executorKey]; exists { + return executor, true, nil + } + + executorLogger := logger.With( + zap.String("wallet", fmt.Sprintf("0x%x", walletPublicKeyBytes)), + ) + + signers := n.walletRegistry.getSigners(walletPublicKey) + if len(signers) == 0 { + // This is not an error because the node simply does not control + // the given wallet. + return nil, false, nil + } + + // All signers belong to one wallet. Take that wallet from the + // first signer. + wallet := signers[0].wallet + + channelName := fmt.Sprintf( + "%s-%s-coordination", + ProtocolName, + hex.EncodeToString(walletPublicKeyBytes), + ) + + broadcastChannel, err := n.netProvider.BroadcastChannelFor(channelName) + if err != nil { + return nil, false, fmt.Errorf("failed to get broadcast channel: [%v]", err) + } + + // TODO: Register unmarshalers + + membershipValidator := group.NewMembershipValidator( + executorLogger, + wallet.signingGroupOperators, + n.chain.Signing(), + ) + + err = broadcastChannel.SetFilter(membershipValidator.IsInGroup) + if err != nil { + return nil, false, fmt.Errorf( + "could not set filter for channel [%v]: [%v]", + broadcastChannel.Name(), + err, + ) + } + + executorLogger.Infof( + "coordination executor created; controlling [%v] signers", + len(signers), + ) + + executor := newCoordinationExecutor( + signers, + broadcastChannel, + membershipValidator, + n.protocolLatch, + ) + + n.coordinationExecutors[executorKey] = executor + + return executor, true, nil +} + // handleHeartbeatRequest handles an incoming wallet heartbeat request. // First, it determines whether the node is supposed to do an action by checking // whether any of the request's target wallet signers are under the node's control. @@ -552,6 +643,165 @@ func (n *node) handleRedemptionProposal( walletActionLogger.Infof("wallet action dispatched successfully") } +// coordinationLayerSettings represents settings for the coordination layer. +type coordinationLayerSettings struct { + // executeCoordinationProcedureFn is a function executing the coordination + // procedure for the given wallet and coordination window. + executeCoordinationProcedureFn func( + node *node, + window *coordinationWindow, + walletPublicKey *ecdsa.PublicKey, + ) (*coordinationResult, bool) + + // processCoordinationResultFn is a function processing the given + // coordination result. + processCoordinationResultFn func( + node *node, + result *coordinationResult, + ) +} + +// runCoordinationLayer starts the coordination layer of the node. It is +// responsible for detecting new coordination windows, running coordination +// procedures for all wallets controlled by the node, and processing +// coordination results. +func (n *node) runCoordinationLayer( + ctx context.Context, + settings ...*coordinationLayerSettings, +) error { + // Resolve settings for the coordination layer. + var cls *coordinationLayerSettings + switch len(settings) { + case 1: + cls = settings[0] + default: + cls = &coordinationLayerSettings{ + executeCoordinationProcedureFn: executeCoordinationProcedure, + processCoordinationResultFn: processCoordinationResult, + } + } + + blockCounter, err := n.chain.BlockCounter() + if err != nil { + return fmt.Errorf("cannot get block counter: [%w]", err) + } + + coordinationResultChan := make(chan *coordinationResult) + + // Prepare a callback function that will be called every time a new + // coordination window is detected. + onWindowFn := func(window *coordinationWindow) { + // Fetch all wallets controlled by the node. It is important to + // get the wallets every time the window is triggered as the + // node may have started controlling a new wallet in the meantime. + walletsPublicKeys := n.walletRegistry.getWalletsPublicKeys() + + for _, currentWalletPublicKey := range walletsPublicKeys { + // Run an independent coordination procedure for the given wallet + // in a separate goroutine. The coordination result will be sent + // to the coordination result channel. + go func(walletPublicKey *ecdsa.PublicKey) { + result, ok := cls.executeCoordinationProcedureFn( + n, + window, + walletPublicKey, + ) + if ok { + coordinationResultChan <- result + } + }(currentWalletPublicKey) + } + } + + // Start the coordination windows watcher. + go watchCoordinationWindows( + ctx, + blockCounter.WatchBlocks, + onWindowFn, + ) + + // Start the coordination result processor. + go func() { + for { + select { + case result := <-coordinationResultChan: + go cls.processCoordinationResultFn(n, result) + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +// executeCoordinationProcedure executes the coordination procedure for the +// given wallet and coordination window. +func executeCoordinationProcedure( + node *node, + window *coordinationWindow, + walletPublicKey *ecdsa.PublicKey, +) (*coordinationResult, bool) { + walletPublicKeyBytes, err := marshalPublicKey(walletPublicKey) + if err != nil { + logger.Errorf("cannot marshal wallet public key: [%v]", err) + return nil, false + } + + procedureLogger := logger.With( + zap.Uint64("coordinationBlock", window.coordinationBlock), + zap.String("wallet", fmt.Sprintf("0x%x", walletPublicKeyBytes)), + ) + + procedureLogger.Infof("starting coordination procedure") + + executor, ok, err := node.getCoordinationExecutor(walletPublicKey) + if err != nil { + procedureLogger.Errorf("cannot get coordination executor: [%v]", err) + return nil, false + } + // This check is actually redundant. We know the node controls some + // wallet signers as we just got the wallet from the registry. + // However, we are doing it just in case. The API contract of + // getWalletsPublicKeys and/or getCoordinationExecutor may change one day. + if !ok { + procedureLogger.Infof("node does not control signers of this wallet") + return nil, false + } + + result, err := executor.coordinate(window) + if err != nil { + procedureLogger.Errorf("coordination procedure failed: [%v]", err) + return nil, false + } + + procedureLogger.Infof( + "coordination procedure finished successfully with result [%s]", + result, + ) + + return result, true +} + +// processCoordinationResult processes the given coordination result. +func processCoordinationResult(node *node, result *coordinationResult) { + logger.Infof("processing coordination result [%s]", result) + + // TODO: Record coordination faults. + + // TODO: Detect proposal type and run the appropriate handler. + switch result.proposal.actionType() { + case ActionHeartbeat: + // node.handleHeartbeatRequest() + case ActionDepositSweep: + // node.handleDepositSweepProposal() + case ActionRedemption: + // node.handleRedemptionProposal() + default: + logger.Errorf("no handler for coordination result [%s]", result) + } +} + // waitForBlockFn represents a function blocking the execution until the given // block height. type waitForBlockFn func(context.Context, uint64) error diff --git a/pkg/tbtc/node_test.go b/pkg/tbtc/node_test.go index fff1aa1cef..5fc3ca15d0 100644 --- a/pkg/tbtc/node_test.go +++ b/pkg/tbtc/node_test.go @@ -1,11 +1,13 @@ package tbtc import ( + "context" "crypto/ecdsa" "encoding/hex" "fmt" "reflect" "testing" + "time" "github.com/keep-network/keep-common/pkg/persistence" "github.com/keep-network/keep-core/internal/testutils" @@ -134,6 +136,269 @@ func TestNode_GetSigningExecutor(t *testing.T) { } } +func TestNode_GetCoordinationExecutor(t *testing.T) { + groupParameters := &GroupParameters{ + GroupSize: 5, + GroupQuorum: 4, + HonestThreshold: 3, + } + + localChain := Connect() + localProvider := local.Connect() + + signer := createMockSigner(t) + + // Populate the mock keystore with the mock signer's data. This is + // required to make the node controlling the signer's wallet. + keyStorePersistence := createMockKeyStorePersistence(t, signer) + + node, err := newNode( + groupParameters, + localChain, + newLocalBitcoinChain(), + localProvider, + keyStorePersistence, + &mockPersistenceHandle{}, + generator.StartScheduler(), + Config{}, + ) + if err != nil { + t.Fatal(err) + } + + walletPublicKey := signer.wallet.publicKey + walletPublicKeyBytes, err := marshalPublicKey(walletPublicKey) + if err != nil { + t.Fatal(err) + } + + testutils.AssertIntsEqual( + t, + "cache size", + 0, + len(node.coordinationExecutors), + ) + + executor, ok, err := node.getCoordinationExecutor(walletPublicKey) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("node is supposed to control wallet signers") + } + + testutils.AssertIntsEqual( + t, + "cache size", + 1, + len(node.coordinationExecutors), + ) + + testutils.AssertIntsEqual( + t, + "signers count", + 1, + len(executor.signers), + ) + + if !reflect.DeepEqual(signer, executor.signers[0]) { + t.Errorf("executor holds an unexpected signer") + } + + expectedChannel := fmt.Sprintf( + "%s-%s-coordination", + ProtocolName, + hex.EncodeToString(walletPublicKeyBytes), + ) + testutils.AssertStringsEqual( + t, + "broadcast channel", + expectedChannel, + executor.broadcastChannel.Name(), + ) + + _, ok, err = node.getCoordinationExecutor(walletPublicKey) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("node is supposed to control wallet signers") + } + + // The executor was already created in the previous call so cached instance + // should be returned and no new executors should be created. + testutils.AssertIntsEqual( + t, + "cache size", + 1, + len(node.coordinationExecutors), + ) + + // Construct an arbitrary public key representing a wallet that is not + // controlled by the node. We need to make sure the public key's points + // are on the curve to avoid troubles during processing. + x, y := walletPublicKey.Curve.Double(walletPublicKey.X, walletPublicKey.Y) + nonControlledWalletPublicKey := &ecdsa.PublicKey{ + Curve: walletPublicKey.Curve, + X: x, + Y: y, + } + + _, ok, err = node.getCoordinationExecutor(nonControlledWalletPublicKey) + if err != nil { + t.Fatal(err) + } + if ok { + t.Fatal("node is not supposed to control wallet signers") + } +} + +func TestNode_RunCoordinationLayer(t *testing.T) { + groupParameters := &GroupParameters{ + GroupSize: 5, + GroupQuorum: 4, + HonestThreshold: 3, + } + + blockTime := 1 * time.Millisecond + + localChain := Connect(blockTime) + localProvider := local.Connect() + + signer := createMockSigner(t) + + // Populate the mock keystore with the mock signer's data. This is + // required to make the node controlling the signer's wallet. + keyStorePersistence := createMockKeyStorePersistence(t, signer) + + n, err := newNode( + groupParameters, + localChain, + newLocalBitcoinChain(), + localProvider, + keyStorePersistence, + &mockPersistenceHandle{}, + generator.StartScheduler(), + Config{}, + ) + if err != nil { + t.Fatal(err) + } + + // Mock the coordination procedure execution. Return predefined results + // on specific coordination windows. + executeCoordinationProcedureFn := func( + _ *node, + window *coordinationWindow, + walletPublicKey *ecdsa.PublicKey, + ) (*coordinationResult, bool) { + if signer.wallet.publicKey.Equal(walletPublicKey) { + result, ok := map[uint64]*coordinationResult{ + 900: { + proposal: &mockCoordinationProposal{ActionDepositSweep}, + }, + // Omit window at block 1800 to make sure the layer doesn't + // crash if no result is produced. + 2700: { + proposal: &mockCoordinationProposal{ActionRedemption}, + }, + // Put some trash value to make sure coordination windows + // are distributed correctly. + 2705: { + proposal: &mockCoordinationProposal{ActionMovingFunds}, + }, + 3600: { + proposal: &mockCoordinationProposal{ActionNoop}, + }, + 4500: { + proposal: &mockCoordinationProposal{ActionMovedFundsSweep}, + }, + }[window.coordinationBlock] + + return result, ok + } + + return nil, false + } + + // Simply add processed results to the list. + var processedResults []*coordinationResult + processCoordinationResultFn := func( + _ *node, + result *coordinationResult, + ) { + processedResults = append(processedResults, result) + } + + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + err = n.runCoordinationLayer( + ctx, + &coordinationLayerSettings{ + executeCoordinationProcedureFn: executeCoordinationProcedureFn, + processCoordinationResultFn: processCoordinationResultFn, + }, + ) + if err != nil { + t.Fatal(err) + } + + // Wait until the second-last coordination window passes. + err = localChain.blockCounter.WaitForBlockHeight(4000) + if err != nil { + t.Fatal(err) + } + + // Stop coordination layer. As we are between the second-last and the last + // coordination window, the last window should not be processed. This + // allows us to test that the coordination layer's shutdown works as expected. + cancelCtx() + + // Wait until the last coordination window passes. + err = localChain.blockCounter.WaitForBlockHeight(5000) + if err != nil { + t.Fatal(err) + } + + testutils.AssertIntsEqual( + t, + "processed results count", + 3, + len(processedResults), + ) + testutils.AssertStringsEqual( + t, + "first result", + ActionDepositSweep.String(), + processedResults[0].proposal.actionType().String(), + ) + testutils.AssertStringsEqual( + t, + "second result", + ActionRedemption.String(), + processedResults[1].proposal.actionType().String(), + ) + testutils.AssertStringsEqual( + t, + "third result", + ActionNoop.String(), + processedResults[2].proposal.actionType().String(), + ) +} + +type mockCoordinationProposal struct { + action WalletActionType +} + +func (mcp *mockCoordinationProposal) actionType() WalletActionType { + return mcp.action +} + +func (mcp *mockCoordinationProposal) validityBlocks() uint64 { + panic("unsupported") +} + // createMockSigner creates a mock signer instance that can be used for // test cases that needs a placeholder signer. The produced signer cannot // be used to test actual signing scenarios. diff --git a/pkg/tbtc/redemption.go b/pkg/tbtc/redemption.go index 472966bf6f..5098caf6eb 100644 --- a/pkg/tbtc/redemption.go +++ b/pkg/tbtc/redemption.go @@ -3,6 +3,7 @@ package tbtc import ( "crypto/ecdsa" "fmt" + "math/big" "time" "go.uber.org/zap" @@ -14,6 +15,12 @@ import ( ) const ( + // redemptionProposalValidityBlocks determines the redemption proposal + // validity time expressed in blocks. In other words, this is the worst-case + // time for a redemption during which the wallet is busy and cannot take + // another actions. The value of 600 blocks is roughly 2 hours, assuming + // 12 seconds per block. + redemptionProposalValidityBlocks = 600 // redemptionProposalConfirmationBlocks determines the block length of the // confirmation period on the host chain that is preserved after a // redemption proposal submission. @@ -44,6 +51,22 @@ const ( redemptionBroadcastCheckDelay = 1 * time.Minute ) +// RedemptionProposal represents a redemption proposal issued by a wallet's +// coordination leader. +type RedemptionProposal struct { + WalletPublicKeyHash [20]byte + RedeemersOutputScripts []bitcoin.Script + RedemptionTxFee *big.Int +} + +func (rp *RedemptionProposal) actionType() WalletActionType { + return ActionRedemption +} + +func (rp *RedemptionProposal) validityBlocks() uint64 { + return redemptionProposalValidityBlocks +} + // RedemptionTransactionShape is an enum describing the shape of // a Bitcoin redemption transaction. type RedemptionTransactionShape uint8 diff --git a/pkg/tbtc/registry.go b/pkg/tbtc/registry.go index e0a51e5a8b..80334f7475 100644 --- a/pkg/tbtc/registry.go +++ b/pkg/tbtc/registry.go @@ -77,6 +77,21 @@ func newWalletRegistry(persistence persistence.ProtectedHandle) *walletRegistry } } +// getWalletsPublicKeys returns public keys of all registered wallets. +func (wr *walletRegistry) getWalletsPublicKeys() []*ecdsa.PublicKey { + wr.mutex.Lock() + defer wr.mutex.Unlock() + + keys := make([]*ecdsa.PublicKey, 0) + for _, value := range wr.walletCache { + // We can take the wallet from the first signer. All signers for the + // given cache value belong to the same wallet. + keys = append(keys, value.signers[0].wallet.publicKey) + } + + return keys +} + // registerSigner registers the given signer using in the walletRegistry. func (wr *walletRegistry) registerSigner(signer *signer) error { wr.mutex.Lock() diff --git a/pkg/tbtc/registry_test.go b/pkg/tbtc/registry_test.go index e6f825a63b..2f10f19f88 100644 --- a/pkg/tbtc/registry_test.go +++ b/pkg/tbtc/registry_test.go @@ -180,6 +180,29 @@ func TestWalletRegistry_PrePopulateWalletCache(t *testing.T) { } } +func TestWalletRegistry_GetWalletsPublicKeys(t *testing.T) { + persistenceHandle := &mockPersistenceHandle{} + + walletRegistry := newWalletRegistry(persistenceHandle) + + signer := createMockSigner(t) + + err := walletRegistry.registerSigner(signer) + if err != nil { + t.Fatal(err) + } + + keys := walletRegistry.getWalletsPublicKeys() + + testutils.AssertIntsEqual(t, "keys count", 1, len(keys)) + testutils.AssertBoolsEqual( + t, + "keys equal", + true, + keys[0].Equal(signer.wallet.publicKey), + ) +} + func TestWalletStorage_SaveSigner(t *testing.T) { persistenceHandle := &mockPersistenceHandle{} diff --git a/pkg/tbtc/tbtc.go b/pkg/tbtc/tbtc.go index 6cb8ad1b03..f36f6bfe00 100644 --- a/pkg/tbtc/tbtc.go +++ b/pkg/tbtc/tbtc.go @@ -101,6 +101,11 @@ func Initialize( return fmt.Errorf("cannot set up TBTC node: [%v]", err) } + err = node.runCoordinationLayer(ctx) + if err != nil { + return fmt.Errorf("cannot run coordination layer: [%w]", err) + } + deduplicator := newDeduplicator() if clientInfo != nil { diff --git a/pkg/tbtc/wallet.go b/pkg/tbtc/wallet.go index 0a1ff1b67e..33e031ba0d 100644 --- a/pkg/tbtc/wallet.go +++ b/pkg/tbtc/wallet.go @@ -373,11 +373,7 @@ func (w *wallet) String() string { w.publicKey.Y, ) - return fmt.Sprintf( - "wallet [0x%x] with a signing group of [%v]", - publicKey, - len(w.signingGroupOperators), - ) + return fmt.Sprintf("public key [0x%x]", publicKey) } // DetermineWalletMainUtxo determines the plain-text wallet main UTXO @@ -636,7 +632,7 @@ func newSigner( func (s *signer) String() string { return fmt.Sprintf( - "signer with index [%v] of %s", + "signer with index [%v] of wallet [%s]", s.signingGroupMemberIndex, &s.wallet, )