Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC 12 Implementation: Coordination layer groundwork #3744

Merged
merged 13 commits into from
Nov 27, 2023
Merged
18 changes: 13 additions & 5 deletions pkg/chain/local_v1/blockcounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type watcher struct {
channel chan uint64
}

var blockTime = time.Duration(500 * time.Millisecond)
var defaultBlockTime = 500 * time.Millisecond

func (lbc *localBlockCounter) WaitForBlockHeight(blockNumber uint64) error {
waiter, err := lbc.BlockHeightWaiter(blockNumber)
Expand Down Expand Up @@ -88,8 +88,16 @@ func (lbc *localBlockCounter) WatchBlocks(ctx context.Context) <-chan uint64 {

// count is an internal function that counts up time to simulate the generation
// of blocks.
func (lbc *localBlockCounter) count() {
ticker := time.NewTicker(blockTime)
func (lbc *localBlockCounter) count(blockTime ...time.Duration) {
var resolvedBlockTime time.Duration
switch len(blockTime) {
case 1:
resolvedBlockTime = blockTime[0]
default:
resolvedBlockTime = defaultBlockTime
}

ticker := time.NewTicker(resolvedBlockTime)

for range ticker.C {
lbc.structMutex.Lock()
Expand Down Expand Up @@ -127,10 +135,10 @@ func (lbc *localBlockCounter) count() {
// BlockCounter creates a BlockCounter that runs completely locally. It is
// designed to simply increase block height at a set time interval in the
// background.
func BlockCounter() (chain.BlockCounter, error) {
func BlockCounter(blockTime ...time.Duration) (chain.BlockCounter, error) {
counter := localBlockCounter{blockHeight: 0, waiters: make(map[uint64][]chan uint64)}

go counter.count()
go counter.count(blockTime...)

return &counter, nil
}
6 changes: 3 additions & 3 deletions pkg/chain/local_v1/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,13 @@ func TestLocalBlockHeightWaiter(t *testing.T) {
},
"returns immediately for block height already reached": {
blockHeight: 2,
initialDelay: 3 * blockTime,
initialDelay: 3 * defaultBlockTime,
expectedWaitTime: 0,
},
"waits for block height not yet reached": {
blockHeight: 5,
initialDelay: 2 * blockTime,
expectedWaitTime: 3 * blockTime,
initialDelay: 2 * defaultBlockTime,
expectedWaitTime: 3 * defaultBlockTime,
},
}

Expand Down
24 changes: 6 additions & 18 deletions pkg/tbtc/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,26 +363,19 @@ type WalletCoordinatorChain interface {

// HeartbeatRequestSubmittedEvent represents a wallet heartbeat request
// submitted to the chain.
//
// TODO: Remove this type and all related code.
type HeartbeatRequestSubmittedEvent struct {
WalletPublicKeyHash [20]byte
Message []byte
Coordinator chain.Address
BlockNumber uint64
}

// DepositSweepProposal represents a deposit sweep proposal submitted to the chain.
type DepositSweepProposal struct {
WalletPublicKeyHash [20]byte
DepositsKeys []struct {
FundingTxHash bitcoin.Hash
FundingOutputIndex uint32
}
SweepTxFee *big.Int
DepositsRevealBlocks []*big.Int
}

// DepositSweepProposalSubmittedEvent represents a deposit sweep proposal
// submission event.
//
// TODO: Remove this type and all related code.
type DepositSweepProposalSubmittedEvent struct {
Proposal *DepositSweepProposal
Coordinator chain.Address
Expand All @@ -404,6 +397,8 @@ type DepositSweepProposalSubmittedEventFilter struct {

// RedemptionProposalSubmittedEvent represents a redemption proposal
// submission event.
//
// TODO: Remove this type and all related code.
type RedemptionProposalSubmittedEvent struct {
Proposal *RedemptionProposal
Coordinator chain.Address
Expand All @@ -423,13 +418,6 @@ type RedemptionProposalSubmittedEventFilter struct {
WalletPublicKeyHash [20]byte
}

// RedemptionProposal represents a redemption proposal submitted to the chain.
type RedemptionProposal struct {
WalletPublicKeyHash [20]byte
RedeemersOutputScripts []bitcoin.Script
RedemptionTxFee *big.Int
}

// RedemptionRequestedEvent represents a redemption requested event.
type RedemptionRequestedEvent struct {
WalletPublicKeyHash [20]byte
Expand Down
11 changes: 7 additions & 4 deletions pkg/tbtc/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,19 +794,22 @@ func buildRedemptionProposalValidationKey(
}

// Connect sets up the local chain.
func Connect() *localChain {
func Connect(blockTime ...time.Duration) *localChain {
operatorPrivateKey, _, err := operator.GenerateKeyPair(local_v1.DefaultCurve)
if err != nil {
panic(err)
}

return ConnectWithKey(operatorPrivateKey)
return ConnectWithKey(operatorPrivateKey, blockTime...)
}

// ConnectWithKey sets up the local chain using the provided operator private
// key.
func ConnectWithKey(operatorPrivateKey *operator.PrivateKey) *localChain {
blockCounter, _ := local_v1.BlockCounter()
func ConnectWithKey(
operatorPrivateKey *operator.PrivateKey,
blockTime ...time.Duration,
) *localChain {
blockCounter, _ := local_v1.BlockCounter(blockTime...)

localChain := &localChain{
dkgResultSubmissionHandlers: make(
Expand Down
257 changes: 257 additions & 0 deletions pkg/tbtc/coordination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package tbtc

import (
"context"
"fmt"
"github.com/keep-network/keep-core/pkg/chain"
"github.com/keep-network/keep-core/pkg/generator"
"github.com/keep-network/keep-core/pkg/net"
"github.com/keep-network/keep-core/pkg/protocol/group"
"golang.org/x/sync/semaphore"
)

const (
// coordinationFrequencyBlocks is the number of blocks between two
// consecutive coordination windows.
coordinationFrequencyBlocks = 900
// coordinationActivePhaseDurationBlocks is the number of blocks in the
// active phase of the coordination window. The active phase is the
// phase during which the communication between the coordination leader and
// their followers is allowed.
coordinationActivePhaseDurationBlocks = 80
// coordinationPassivePhaseDurationBlocks is the number of blocks in the
// passive phase of the coordination window. The passive phase is the
// phase during which communication is not allowed. Participants are
// expected to validate the result of the coordination and prepare for
// execution of the proposed wallet action.
coordinationPassivePhaseDurationBlocks = 20
// coordinationDurationBlocks is the number of blocks in a single
// coordination window.
coordinationDurationBlocks = coordinationActivePhaseDurationBlocks +
coordinationPassivePhaseDurationBlocks
)

// errCoordinationExecutorBusy is an error returned when the coordination
// executor cannot execute the requested coordination due to an ongoing one.
var errCoordinationExecutorBusy = fmt.Errorf("coordination executor is busy")

// coordinationWindow represents a single coordination window. The coordination
// block is the first block of the window.
type coordinationWindow struct {
// coordinationBlock is the first block of the coordination window.
coordinationBlock uint64
}

// newCoordinationWindow creates a new coordination window for the given
// coordination block.
func newCoordinationWindow(coordinationBlock uint64) *coordinationWindow {
return &coordinationWindow{
coordinationBlock: coordinationBlock,
}
}

// ActivePhaseEndBlock returns the block number at which the active phase
// of the coordination window ends.
func (cw *coordinationWindow) activePhaseEndBlock() uint64 {
return cw.coordinationBlock + coordinationActivePhaseDurationBlocks
}

// EndBlock returns the block number at which the coordination window ends.
func (cw *coordinationWindow) endBlock() uint64 {
return cw.coordinationBlock + coordinationDurationBlocks
}

// isAfter returns true if this coordination window is after the other
// window.
func (cw *coordinationWindow) isAfter(other *coordinationWindow) bool {
if other == nil {
return true
}

return cw.coordinationBlock > other.coordinationBlock
}

// watchCoordinationWindows watches for new coordination windows and runs
// the given callback when a new window is detected. The callback is run
// in a separate goroutine. It is guaranteed that the callback is not run
// twice for the same window. The context passed as the first parameter
// is used to cancel the watch.
func watchCoordinationWindows(
ctx context.Context,
watchBlocksFn func(ctx context.Context) <-chan uint64,
onWindowFn func(window *coordinationWindow),
) {
blocksChan := watchBlocksFn(ctx)
var lastWindow *coordinationWindow

for {
select {
case block := <-blocksChan:
if block%coordinationFrequencyBlocks == 0 {
// Make sure the current window is not the same as the last one.
// There is no guarantee that the block channel will not emit
// the same block again.
if window := newCoordinationWindow(block); window.isAfter(lastWindow) {
lastWindow = window
// Run the callback in a separate goroutine to avoid blocking
// this loop and potentially missing the next block.
go onWindowFn(window)
}
}
case <-ctx.Done():
return
}
}
}

// CoordinationFaultType represents a type of the coordination fault.
type CoordinationFaultType uint8

const (
// FaultUnknown is a fault type used when the fault type is unknown.
FaultUnknown CoordinationFaultType = iota
// FaultLeaderIdleness is a fault type used when the leader was idle, i.e.
// missed their turn to propose a wallet action.
FaultLeaderIdleness
// FaultLeaderMistake is a fault type used when the leader's proposal
// turned out to be invalid.
FaultLeaderMistake
// FaultLeaderImpersonation is a fault type used when the leader was
// impersonated by another operator who raised their own proposal.
FaultLeaderImpersonation
)

func (cft CoordinationFaultType) String() string {
switch cft {
case FaultUnknown:
return "Unknown"
case FaultLeaderIdleness:
return "LeaderIdleness"
case FaultLeaderMistake:
return "FaultLeaderMistake"
case FaultLeaderImpersonation:
return "LeaderImpersonation"
default:
panic("unknown coordination fault type")
}
}

// coordinationFault represents a single coordination fault.
type coordinationFault struct {
// culprit is the address of the operator that is responsible for the fault.
culprit chain.Address
// faultType is the type of the fault.
faultType CoordinationFaultType
}

func (cf *coordinationFault) String() string {
return fmt.Sprintf(
"operator [%s], fault [%s]",
cf.culprit,
cf.faultType,
)
}

// coordinationProposal represents a single action proposal for the given wallet.
type coordinationProposal interface {
// actionType returns the specific type of the walletAction being subject
// of this proposal.
actionType() WalletActionType
// validityBlocks returns the number of blocks for which the proposal is
// valid.
validityBlocks() uint64
}

// noopProposal is a proposal that does not propose any action.
type noopProposal struct{}

func (np *noopProposal) actionType() WalletActionType {
return ActionNoop
}

func (np *noopProposal) validityBlocks() uint64 {
// Panic to make sure that the proposal is not processed by the node.
panic("noop proposal does not have validity blocks")
}

// coordinationResult represents the result of the coordination procedure
// executed for the given wallet in the given coordination window.
type coordinationResult struct {
wallet wallet
window *coordinationWindow
leader chain.Address
proposal coordinationProposal
faults []*coordinationFault
}

func (cr *coordinationResult) String() string {
return fmt.Sprintf(
"wallet [%s], window [%v], leader [%s], proposal [%s], faults [%s]",
&cr.wallet,
cr.window.coordinationBlock,
cr.leader,
cr.proposal.actionType(),
cr.faults,
)
}

// coordinationExecutor is responsible for executing the coordination
// procedure for the given wallet.
type coordinationExecutor struct {
lock *semaphore.Weighted

signers []*signer // TODO: Do we need whole signers?
broadcastChannel net.BroadcastChannel
membershipValidator *group.MembershipValidator
protocolLatch *generator.ProtocolLatch
}

// newCoordinationExecutor creates a new coordination executor for the
// given wallet.
func newCoordinationExecutor(
signers []*signer,
broadcastChannel net.BroadcastChannel,
membershipValidator *group.MembershipValidator,
protocolLatch *generator.ProtocolLatch,
) *coordinationExecutor {
return &coordinationExecutor{
lock: semaphore.NewWeighted(1),
signers: signers,
broadcastChannel: broadcastChannel,
membershipValidator: membershipValidator,
protocolLatch: protocolLatch,
}
}

// wallet returns the wallet this executor is responsible for.
func (ce *coordinationExecutor) wallet() wallet {
// All signers belong to one wallet. Take that wallet from the
// first signer.
return ce.signers[0].wallet
}

// coordinate executes the coordination procedure for the given coordination
// window.
func (ce *coordinationExecutor) coordinate(
window *coordinationWindow,
) (*coordinationResult, error) {
if lockAcquired := ce.lock.TryAcquire(1); !lockAcquired {
return nil, errCoordinationExecutorBusy
}
defer ce.lock.Release(1)

// TODO: Implement coordination logic. Remember about:
// - Setting up the right context
// - Using the protocol latch
// - Using the membership validator
pdyraga marked this conversation as resolved.
Show resolved Hide resolved
// Example result:
result := &coordinationResult{
wallet: ce.wallet(),
window: window,
leader: ce.wallet().signingGroupOperators[0],
proposal: &noopProposal{},
faults: nil,
}

return result, nil
}
Loading