diff --git a/beacon/beacon.go b/beacon/beacon.go index 4a41a03000..c307dfa34f 100644 --- a/beacon/beacon.go +++ b/beacon/beacon.go @@ -20,6 +20,7 @@ import ( "github.com/spacemeshos/go-spacemesh/beacon/weakcoin" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/common/types/result" "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" @@ -128,6 +129,7 @@ func New( beacons: make(map[types.EpochID]types.Beacon), ballotsBeacons: make(map[types.EpochID]map[types.Beacon]*beaconWeight), states: make(map[types.EpochID]*state), + results: make(chan result.Beacon, 100), } for _, opt := range opts { opt(pd) @@ -203,6 +205,7 @@ type ProtocolDriver struct { // the map key is the epoch when the ballot is published. the beacon value is calculated in the // previous epoch and used in the current epoch. ballotsBeacons map[types.EpochID]map[types.Beacon]*beaconWeight + results chan result.Beacon // metrics metricsCollector *metrics.BeaconMetricsCollector @@ -250,6 +253,7 @@ func (pd *ProtocolDriver) UpdateBeacon(epoch types.EpochID, beacon types.Beacon) } pd.beacons[epoch] = beacon pd.logger.With().Info("using fallback beacon", epoch, beacon) + pd.onResult(epoch, beacon) return nil } @@ -262,9 +266,26 @@ func (pd *ProtocolDriver) Close() { if err := pd.eg.Wait(); err != nil { pd.logger.With().Info("received error waiting for goroutines to finish", log.Err(err)) } + close(pd.results) pd.logger.Info("beacon goroutines finished") } +func (pd *ProtocolDriver) onResult(epoch types.EpochID, beacon types.Beacon) { + select { + case pd.results <- result.Beacon{Epoch: epoch, Beacon: beacon}: + default: + pd.logger.With().Error("results queue is congested", + log.Uint32("epoch_id", epoch.Uint32()), + log.Stringer("beacon", beacon), + ) + } +} + +// Results notifies waiter that beacon for a target epoch has completed. +func (pd *ProtocolDriver) Results() <-chan result.Beacon { + return pd.results +} + // isClosed returns true if the beacon protocol is shutting down. func (pd *ProtocolDriver) isClosed() bool { select { @@ -478,6 +499,7 @@ func (pd *ProtocolDriver) setBeacon(targetEpoch types.EpochID, beacon types.Beac return fmt.Errorf("persist beacon: %w", err) } pd.beacons[targetEpoch] = beacon + pd.onResult(targetEpoch, beacon) return nil } diff --git a/beacon/beacon_test.go b/beacon/beacon_test.go index 0ae33284dc..4dd3c51e03 100644 --- a/beacon/beacon_test.go +++ b/beacon/beacon_test.go @@ -16,6 +16,7 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/common/types/result" "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/p2p" @@ -267,6 +268,16 @@ func TestBeacon_NoProposals(t *testing.T) { } } +func getNoWait(tb testing.TB, results <-chan result.Beacon) result.Beacon { + select { + case rst := <-results: + return rst + default: + } + require.Fail(tb, "beacon is not available") + return result.Beacon{} +} + func TestBeaconNotSynced(t *testing.T) { tpd := setUpProtocolDriver(t) tpd.mSync.EXPECT().IsSynced(gomock.Any()).Return(false).AnyTimes() @@ -281,6 +292,8 @@ func TestBeaconNotSynced(t *testing.T) { bootstrap := types.Beacon{1, 2, 3, 4} require.NoError(t, tpd.UpdateBeacon(types.EpochID(2), bootstrap)) got, err = tpd.GetBeacon(types.EpochID(2)) + require.Equal(t, got, getNoWait(t, tpd.Results()).Beacon) + require.NoError(t, err) require.Equal(t, bootstrap, got) diff --git a/cmd/node/node.go b/cmd/node/node.go index 32832e8275..33696c2923 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -54,6 +54,7 @@ import ( "github.com/spacemeshos/go-spacemesh/proposals" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/layers" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" "github.com/spacemeshos/go-spacemesh/syncer" "github.com/spacemeshos/go-spacemesh/system" @@ -522,7 +523,7 @@ func (app *App) initServices( if trtlCfg.BadBeaconVoteDelayLayers == 0 { trtlCfg.BadBeaconVoteDelayLayers = app.Config.LayersPerEpoch } - trtl, err := tortoise.New(app.cachedDB, beaconProtocol, + trtl, err := tortoise.Recover(app.cachedDB, beaconProtocol, tortoise.WithContext(ctx), tortoise.WithLogger(app.addLogger(TrtlLogger, lg)), tortoise.WithConfig(trtlCfg), @@ -530,6 +531,13 @@ func (app *App) initServices( if err != nil { return fmt.Errorf("can't recover tortoise state: %w", err) } + app.eg.Go(func() error { + for rst := range beaconProtocol.Results() { + trtl.OnBeacon(rst.Epoch, rst.Beacon) + } + app.log.Debug("beacon results watcher exited") + return nil + }) executor := mesh.NewExecutor(app.cachedDB, state, app.conState, app.addLogger(ExecutorLogger, lg)) msh, err := mesh.NewMesh(app.cachedDB, clock, trtl, executor, app.conState, app.addLogger(MeshLogger, lg)) @@ -656,6 +664,7 @@ func (app *App) initServices( patrol, app.hOracle, clock, + tortoiseWeakCoin{db: app.cachedDB, tortoise: trtl}, app.addLogger(HareLogger, lg), ) @@ -1270,3 +1279,16 @@ func decodeLoggers(cfg config.LoggerConfig) (map[string]string, error) { } return rst, nil } + +type tortoiseWeakCoin struct { + db sql.Executor + tortoise system.Tortoise +} + +func (w tortoiseWeakCoin) Set(lid types.LayerID, value bool) error { + if err := layers.SetWeakCoin(w.db, lid, value); err != nil { + return err + } + w.tortoise.OnWeakCoin(lid, value) + return nil +} diff --git a/common/types/ballot.go b/common/types/ballot.go index 72e94ceec5..37d96a3dd2 100644 --- a/common/types/ballot.go +++ b/common/types/ballot.go @@ -185,22 +185,24 @@ func (v *Votes) MarshalLogObject(encoder log.ObjectEncoder) error { return nil } -// Vote additionally carries layer id and height -// in order for the tortoise to count votes without downloading block body. -type Vote struct { +type BlockHeader struct { ID BlockID LayerID LayerID Height uint64 } // MarshalLogObject implements logging interface. -func (s *Vote) MarshalLogObject(encoder log.ObjectEncoder) error { - encoder.AddString("id", s.ID.String()) - encoder.AddUint32("layer", s.LayerID.Uint32()) - encoder.AddUint64("height", s.Height) +func (header *BlockHeader) MarshalLogObject(encoder log.ObjectEncoder) error { + encoder.AddString("id", header.ID.String()) + encoder.AddUint32("layer", header.LayerID.Uint32()) + encoder.AddUint64("height", header.Height) return nil } +// Vote additionally carries layer id and height +// in order for the tortoise to count votes without downloading block body. +type Vote = BlockHeader + // Opinion is a tuple from opinion hash and votes that decode to opinion hash. type Opinion struct { Hash Hash32 @@ -291,7 +293,7 @@ func (b *Ballot) MarshalLogObject(encoder log.ObjectEncoder) error { encoder.AddUint32("layer_id", b.Layer.Uint32()) encoder.AddUint32("epoch_id", uint32(b.Layer.GetEpoch())) encoder.AddString("smesher", b.SmesherID.String()) - encoder.AddString("opinion hash", b.OpinionHash.String()) + encoder.AddString("opinion hash", b.OpinionHash.ShortString()) encoder.AddString("base_ballot", b.Votes.Base.String()) encoder.AddInt("support", len(b.Votes.Support)) encoder.AddInt("against", len(b.Votes.Against)) diff --git a/common/types/ballot_scale.go b/common/types/ballot_scale.go index 3efa4a6942..b45bda376f 100644 --- a/common/types/ballot_scale.go +++ b/common/types/ballot_scale.go @@ -259,7 +259,7 @@ func (t *Votes) DecodeScale(dec *scale.Decoder) (total int, err error) { total += n } { - field, n, err := scale.DecodeStructSliceWithLimit[Vote](dec, 10000) + field, n, err := scale.DecodeStructSliceWithLimit[BlockHeader](dec, 10000) if err != nil { return total, err } @@ -267,7 +267,7 @@ func (t *Votes) DecodeScale(dec *scale.Decoder) (total int, err error) { t.Support = field } { - field, n, err := scale.DecodeStructSliceWithLimit[Vote](dec, 10000) + field, n, err := scale.DecodeStructSliceWithLimit[BlockHeader](dec, 10000) if err != nil { return total, err } @@ -285,7 +285,7 @@ func (t *Votes) DecodeScale(dec *scale.Decoder) (total int, err error) { return total, nil } -func (t *Vote) EncodeScale(enc *scale.Encoder) (total int, err error) { +func (t *BlockHeader) EncodeScale(enc *scale.Encoder) (total int, err error) { { n, err := scale.EncodeByteArray(enc, t.ID[:]) if err != nil { @@ -310,7 +310,7 @@ func (t *Vote) EncodeScale(enc *scale.Encoder) (total int, err error) { return total, nil } -func (t *Vote) DecodeScale(dec *scale.Decoder) (total int, err error) { +func (t *BlockHeader) DecodeScale(dec *scale.Decoder) (total int, err error) { { n, err := scale.DecodeByteArray(dec, t.ID[:]) if err != nil { diff --git a/common/types/result/beacon.go b/common/types/result/beacon.go new file mode 100644 index 0000000000..7c13206c69 --- /dev/null +++ b/common/types/result/beacon.go @@ -0,0 +1,8 @@ +package result + +import "github.com/spacemeshos/go-spacemesh/common/types" + +type Beacon struct { + Epoch types.EpochID + Beacon types.Beacon +} diff --git a/hare/flows_test.go b/hare/flows_test.go index 1c80cccb1b..1eab4556a6 100644 --- a/hare/flows_test.go +++ b/hare/flows_test.go @@ -126,6 +126,7 @@ func (m *p2pManipulator) Publish(ctx context.Context, protocol string, payload [ type hareWithMocks struct { *Hare mockRoracle *mocks.MockRolacle + mockCoin *mocks.MockweakCoin } func createTestHare(tb testing.TB, msh mesh, tcfg config.Config, clock *mockClock, p2p pubsub.PublishSubsciber, name string) *hareWithMocks { @@ -147,7 +148,7 @@ func createTestHare(tb testing.TB, msh mesh, tcfg config.Config, clock *mockCloc mockSyncS.EXPECT().IsBeaconSynced(gomock.Any()).Return(true).AnyTimes() mockRoracle := mocks.NewMockRolacle(ctrl) - + mockCoin := mocks.NewMockweakCoin(ctrl) hare := New( nil, tcfg, @@ -162,6 +163,7 @@ func createTestHare(tb testing.TB, msh mesh, tcfg config.Config, clock *mockCloc patrol, mockStateQ, clock, + mockCoin, logtest.New(tb).WithName(name+"_"+signer.PublicKey().ShortString()), withMesh(msh), ) @@ -170,6 +172,7 @@ func createTestHare(tb testing.TB, msh mesh, tcfg config.Config, clock *mockCloc return &hareWithMocks{ Hare: hare, mockRoracle: mockRoracle, + mockCoin: mockCoin, } } @@ -269,7 +272,6 @@ func Test_multipleCPs(t *testing.T) { mockMesh.EXPECT().GetEpochAtx(gomock.Any(), gomock.Any()).Return(&types.ActivationTxHeader{BaseTickHeight: 11, TickCount: 1}, nil).AnyTimes() mockMesh.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(0), nil).AnyTimes() mockMesh.EXPECT().GetMalfeasanceProof(gomock.Any()).AnyTimes() - mockMesh.EXPECT().SetWeakCoin(gomock.Any(), gomock.Any()).AnyTimes() for lid := types.GetEffectiveGenesis().Add(1); !lid.After(finalLyr); lid = lid.Add(1) { mockMesh.EXPECT().Proposals(lid).Return(pList[lid], nil) for _, p := range pList[lid] { @@ -294,6 +296,7 @@ func Test_multipleCPs(t *testing.T) { h.mockRoracle.EXPECT().Proof(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(types.EmptyVrfSignature, nil).AnyTimes() h.mockRoracle.EXPECT().CalcEligibility(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(uint16(1), nil).AnyTimes() h.mockRoracle.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() + h.mockCoin.EXPECT().Set(gomock.Any(), gomock.Any()).AnyTimes() outputsWaitGroup.Add(1) go func(idx int) { defer outputsWaitGroup.Done() @@ -391,7 +394,6 @@ func Test_multipleCPsAndIterations(t *testing.T) { mockMesh.EXPECT().GetEpochAtx(gomock.Any(), gomock.Any()).Return(&types.ActivationTxHeader{BaseTickHeight: 11, TickCount: 1}, nil).AnyTimes() mockMesh.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(0), nil).AnyTimes() mockMesh.EXPECT().GetMalfeasanceProof(gomock.Any()).AnyTimes() - mockMesh.EXPECT().SetWeakCoin(gomock.Any(), gomock.Any()).AnyTimes() for lid := types.GetEffectiveGenesis().Add(1); !lid.After(finalLyr); lid = lid.Add(1) { mockMesh.EXPECT().Proposals(lid).Return(pList[lid], nil) for _, p := range pList[lid] { @@ -417,6 +419,7 @@ func Test_multipleCPsAndIterations(t *testing.T) { h.mockRoracle.EXPECT().Proof(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(types.EmptyVrfSignature, nil).AnyTimes() h.mockRoracle.EXPECT().CalcEligibility(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(uint16(1), nil).AnyTimes() h.mockRoracle.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() + h.mockCoin.EXPECT().Set(gomock.Any(), gomock.Any()).AnyTimes() outputsWaitGroup.Add(1) go func(idx int) { defer outputsWaitGroup.Done() diff --git a/hare/hare.go b/hare/hare.go index 89147daeed..c3c581afc7 100644 --- a/hare/hare.go +++ b/hare/hare.go @@ -19,7 +19,6 @@ import ( "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/ballots" - "github.com/spacemeshos/go-spacemesh/sql/layers" "github.com/spacemeshos/go-spacemesh/sql/proposals" "github.com/spacemeshos/go-spacemesh/system" ) @@ -88,10 +87,6 @@ func (m defaultMesh) Ballot(bid types.BallotID) (*types.Ballot, error) { return ballots.Get(m, bid) } -func (m defaultMesh) SetWeakCoin(lid types.LayerID, wc bool) error { - return layers.SetWeakCoin(m, lid, wc) -} - // Opt for configuring beacon protocol. type Opt func(*Hare) @@ -105,6 +100,7 @@ func withMesh(m mesh) Opt { type Hare struct { log.Log msh mesh + weakCoin weakCoin config config.Config publisher pubsub.Publisher layerClock LayerClock @@ -152,6 +148,7 @@ func New( patrol layerPatrol, stateQ stateQuerier, layerClock LayerClock, + weakCoin weakCoin, logger log.Log, opts ...Opt, ) *Hare { @@ -181,6 +178,7 @@ func New( h.beacons = beacons h.rolacle = rolacle h.patrol = patrol + h.weakCoin = weakCoin h.networkDelta = conf.WakeupDelta h.outputChan = make(chan TerminationOutput, h.config.Hdist) @@ -530,7 +528,7 @@ func (h *Hare) outputCollectionLoop(ctx context.Context) { // collect coinflip, regardless of success logger.With().Debug("recording weak coin result for layer", log.Bool("weak_coin", coin)) - if err := h.msh.SetWeakCoin(layerID, coin); err != nil { + if err := h.weakCoin.Set(layerID, coin); err != nil { logger.With().Error("failed to set weak coin for layer", log.Err(err)) } if err := h.collectOutput(ctx, out); err != nil { diff --git a/hare/hare_rounds_test.go b/hare/hare_rounds_test.go index f13e6a31b8..78259de328 100644 --- a/hare/hare_rounds_test.go +++ b/hare/hare_rounds_test.go @@ -89,7 +89,6 @@ func runNodesFor(t *testing.T, ctx context.Context, nodes, leaders, maxLayers, l mockMesh.EXPECT().GetEpochAtx(gomock.Any(), gomock.Any()).Return(&types.ActivationTxHeader{BaseTickHeight: 11, TickCount: 1}, nil).AnyTimes() mockMesh.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(0), nil).AnyTimes() mockMesh.EXPECT().GetMalfeasanceProof(gomock.Any()).Return(nil, nil).AnyTimes() - mockMesh.EXPECT().SetWeakCoin(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() for i := 0; i < nodes; i++ { host := mesh.Hosts()[i] @@ -105,6 +104,7 @@ func runNodesFor(t *testing.T, ctx context.Context, nodes, leaders, maxLayers, l return oracle(layer, round, committeeSize, id, sig, th) }).AnyTimes() th.mockRoracle.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() + th.mockCoin.EXPECT().Set(gomock.Any(), gomock.Any()).AnyTimes() go func() { for out := range th.blockGenCh { validate(out.Layer, th) diff --git a/hare/hare_test.go b/hare/hare_test.go index 5a52c5771f..b3a4d1cff3 100644 --- a/hare/hare_test.go +++ b/hare/hare_test.go @@ -152,6 +152,7 @@ func TestHare_New(t *testing.T) { mocks.NewMocklayerPatrol(ctrl), mocks.NewMockstateQuerier(ctrl), newMockClock(), + mocks.NewMockweakCoin(ctrl), logger, withMesh(mocks.NewMockmesh(ctrl)), ) @@ -222,7 +223,7 @@ func TestHare_OutputCollectionLoop(t *testing.T) { require.NoError(t, err) time.Sleep(1 * time.Second) - mockMesh.EXPECT().SetWeakCoin(lyrID, mo.Coinflip()) + h.mockCoin.EXPECT().Set(lyrID, mo.Coinflip()) h.outputChan <- mo lo := <-h.blockGenCh require.Equal(t, lyrID, lo.Layer) @@ -332,7 +333,7 @@ func TestHare_onTick(t *testing.T) { mockMesh.EXPECT().GetEpochAtx(lyrID.GetEpoch(), h.nodeID).Return(&types.ActivationTxHeader{BaseTickHeight: 11, TickCount: 1}, nil) mockMesh.EXPECT().VRFNonce(h.nodeID, lyrID.GetEpoch()).Return(types.VRFPostIndex(1), nil) mockMesh.EXPECT().Proposals(lyrID).Return(pList, nil) - mockMesh.EXPECT().SetWeakCoin(lyrID, gomock.Any()) + h.mockCoin.EXPECT().Set(lyrID, gomock.Any()) mockBeacons := smocks.NewMockBeaconGetter(gomock.NewController(t)) h.beacons = mockBeacons @@ -400,7 +401,7 @@ func TestHare_onTick_notMining(t *testing.T) { mockMesh.EXPECT().GetEpochAtx(lyrID.GetEpoch(), h.nodeID).Return(nil, sql.ErrNotFound) mockMesh.EXPECT().VRFNonce(h.nodeID, lyrID.GetEpoch()).Return(types.VRFPostIndex(1), nil) mockMesh.EXPECT().Proposals(lyrID).Return(pList, nil) - mockMesh.EXPECT().SetWeakCoin(lyrID, gomock.Any()) + h.mockCoin.EXPECT().Set(lyrID, gomock.Any()) mockBeacons := smocks.NewMockBeaconGetter(gomock.NewController(t)) h.beacons = mockBeacons @@ -646,22 +647,22 @@ func TestHare_WeakCoin(t *testing.T) { set := NewSet([]types.ProposalID{{1}, {2}}) // complete + coin flip true - mockMesh.EXPECT().SetWeakCoin(layerID, true) + h.mockCoin.EXPECT().Set(layerID, true) h.outputChan <- mockReport{layerID, set, true, true} require.NoError(t, waitForMsg()) // incomplete + coin flip true - mockMesh.EXPECT().SetWeakCoin(layerID, true) + h.mockCoin.EXPECT().Set(layerID, true) h.outputChan <- mockReport{layerID, set, false, true} require.Error(t, waitForMsg()) // complete + coin flip false - mockMesh.EXPECT().SetWeakCoin(layerID, false) + h.mockCoin.EXPECT().Set(layerID, false) h.outputChan <- mockReport{layerID, set, true, false} require.NoError(t, waitForMsg()) // incomplete + coin flip false - mockMesh.EXPECT().SetWeakCoin(layerID, false) + h.mockCoin.EXPECT().Set(layerID, false) h.outputChan <- mockReport{layerID, set, false, false} require.Error(t, waitForMsg()) } diff --git a/hare/interfaces.go b/hare/interfaces.go index 8ac971ada8..017e264324 100644 --- a/hare/interfaces.go +++ b/hare/interfaces.go @@ -34,8 +34,11 @@ type mesh interface { GetAtxHeader(types.ATXID) (*types.ActivationTxHeader, error) Proposals(types.LayerID) ([]*types.Proposal, error) Ballot(types.BallotID) (*types.Ballot, error) - SetWeakCoin(types.LayerID, bool) error IsMalicious(types.NodeID) (bool, error) AddMalfeasanceProof(types.NodeID, *types.MalfeasanceProof, *sql.Tx) error GetMalfeasanceProof(nodeID types.NodeID) (*types.MalfeasanceProof, error) } + +type weakCoin interface { + Set(types.LayerID, bool) error +} diff --git a/hare/mocks/mocks.go b/hare/mocks/mocks.go index f68ca6f341..54b80f6838 100644 --- a/hare/mocks/mocks.go +++ b/hare/mocks/mocks.go @@ -296,20 +296,6 @@ func (mr *MockmeshMockRecorder) Proposals(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Proposals", reflect.TypeOf((*Mockmesh)(nil).Proposals), arg0) } -// SetWeakCoin mocks base method. -func (m *Mockmesh) SetWeakCoin(arg0 types.LayerID, arg1 bool) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetWeakCoin", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// SetWeakCoin indicates an expected call of SetWeakCoin. -func (mr *MockmeshMockRecorder) SetWeakCoin(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWeakCoin", reflect.TypeOf((*Mockmesh)(nil).SetWeakCoin), arg0, arg1) -} - // VRFNonce mocks base method. func (m *Mockmesh) VRFNonce(arg0 types.NodeID, arg1 types.EpochID) (types.VRFPostIndex, error) { m.ctrl.T.Helper() @@ -324,3 +310,40 @@ func (mr *MockmeshMockRecorder) VRFNonce(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "VRFNonce", reflect.TypeOf((*Mockmesh)(nil).VRFNonce), arg0, arg1) } + +// MockweakCoin is a mock of weakCoin interface. +type MockweakCoin struct { + ctrl *gomock.Controller + recorder *MockweakCoinMockRecorder +} + +// MockweakCoinMockRecorder is the mock recorder for MockweakCoin. +type MockweakCoinMockRecorder struct { + mock *MockweakCoin +} + +// NewMockweakCoin creates a new mock instance. +func NewMockweakCoin(ctrl *gomock.Controller) *MockweakCoin { + mock := &MockweakCoin{ctrl: ctrl} + mock.recorder = &MockweakCoinMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockweakCoin) EXPECT() *MockweakCoinMockRecorder { + return m.recorder +} + +// Set mocks base method. +func (m *MockweakCoin) Set(arg0 types.LayerID, arg1 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Set", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Set indicates an expected call of Set. +func (mr *MockweakCoinMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockweakCoin)(nil).Set), arg0, arg1) +} diff --git a/log/zap.go b/log/zap.go index 53e4a57c85..843f30e820 100644 --- a/log/zap.go +++ b/log/zap.go @@ -88,6 +88,22 @@ func Stringer(name string, val fmt.Stringer) Field { return Field(zap.Stringer(name, val)) } +type ShortString interface { + ShortString() string +} + +type shortStringAdapter struct { + val ShortString +} + +func (a shortStringAdapter) String() string { + return a.val.ShortString() +} + +func ShortStringer(name string, val ShortString) Field { + return Field(zap.Stringer(name, shortStringAdapter{val: val})) +} + // Binary will encode binary content in base64 when logged. func Binary(name string, val []byte) Field { return Field(zap.Binary(name, val)) diff --git a/mesh/mesh.go b/mesh/mesh.go index f6020b68ef..164667d32a 100644 --- a/mesh/mesh.go +++ b/mesh/mesh.go @@ -739,7 +739,7 @@ func (msh *Mesh) AddBlockWithTXs(ctx context.Context, block *types.Block) error // add block to the tortoise before storing it // otherwise fetcher will not wait until data is stored in the tortoise - msh.trtl.OnBlock(block) + msh.trtl.OnBlock(block.ToVote()) if err := blocks.Add(msh.cdb, block); err != nil && !errors.Is(err, sql.ErrObjectExists) { return err } diff --git a/mesh/mesh_test.go b/mesh/mesh_test.go index 7a2b9c129d..951ed2c1a7 100644 --- a/mesh/mesh_test.go +++ b/mesh/mesh_test.go @@ -681,7 +681,7 @@ func TestMesh_CallOnBlock(t *testing.T) { block.LayerIndex = types.LayerID(10) block.Initialize() - tm.mockTortoise.EXPECT().OnBlock(&block) + tm.mockTortoise.EXPECT().OnBlock(block.ToVote()) tm.mockState.EXPECT().LinkTXsWithBlock(block.LayerIndex, block.ID(), block.TxIDs) require.NoError(t, tm.AddBlockWithTXs(context.Background(), &block)) } diff --git a/system/mocks/tortoise.go b/system/mocks/tortoise.go index aa0799a0a7..51abe43748 100644 --- a/system/mocks/tortoise.go +++ b/system/mocks/tortoise.go @@ -51,7 +51,7 @@ func (mr *MockTortoiseMockRecorder) LatestComplete() *gomock.Call { } // OnBlock mocks base method. -func (m *MockTortoise) OnBlock(arg0 *types.Block) { +func (m *MockTortoise) OnBlock(arg0 types.BlockHeader) { m.ctrl.T.Helper() m.ctrl.Call(m, "OnBlock", arg0) } @@ -74,6 +74,18 @@ func (mr *MockTortoiseMockRecorder) OnHareOutput(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnHareOutput", reflect.TypeOf((*MockTortoise)(nil).OnHareOutput), arg0, arg1) } +// OnWeakCoin mocks base method. +func (m *MockTortoise) OnWeakCoin(arg0 types.LayerID, arg1 bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnWeakCoin", arg0, arg1) +} + +// OnWeakCoin indicates an expected call of OnWeakCoin. +func (mr *MockTortoiseMockRecorder) OnWeakCoin(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnWeakCoin", reflect.TypeOf((*MockTortoise)(nil).OnWeakCoin), arg0, arg1) +} + // Results mocks base method. func (m *MockTortoise) Results(from, to types.LayerID) ([]result.Layer, error) { m.ctrl.T.Helper() diff --git a/system/tortoise.go b/system/tortoise.go index c9e7a46b6c..d8939ad431 100644 --- a/system/tortoise.go +++ b/system/tortoise.go @@ -11,8 +11,9 @@ import ( // Tortoise is an interface provided by tortoise implementation. type Tortoise interface { - OnBlock(*types.Block) + OnBlock(types.BlockHeader) OnHareOutput(types.LayerID, types.BlockID) + OnWeakCoin(types.LayerID, bool) TallyVotes(context.Context, types.LayerID) Updates() map[types.LayerID]map[types.BlockID]bool LatestComplete() types.LayerID diff --git a/tortoise/algorithm.go b/tortoise/algorithm.go index cb6fde7e71..fcc03729b7 100644 --- a/tortoise/algorithm.go +++ b/tortoise/algorithm.go @@ -8,10 +8,7 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/common/types/result" - "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/log" - "github.com/spacemeshos/go-spacemesh/sql/ballots" - "github.com/spacemeshos/go-spacemesh/system" ) // Config for protocol parameters. @@ -74,7 +71,7 @@ func WithConfig(cfg Config) Opt { } // New creates Tortoise instance. -func New(cdb *datastore.CachedDB, beacons system.BeaconGetter, opts ...Opt) (*Tortoise, error) { +func New(opts ...Opt) (*Tortoise, error) { t := &Tortoise{ ctx: context.Background(), logger: log.NewNop(), @@ -83,41 +80,13 @@ func New(cdb *datastore.CachedDB, beacons system.BeaconGetter, opts ...Opt) (*To for _, opt := range opts { opt(t) } - if t.cfg.Hdist < t.cfg.Zdist { t.logger.With().Panic("hdist must be >= zdist", log.Uint32("hdist", t.cfg.Hdist), log.Uint32("zdist", t.cfg.Zdist), ) } - - latest, err := ballots.LatestLayer(cdb) - if err != nil { - t.logger.With().Panic("failed to load latest layer", - log.Err(err), - ) - } - needsRecovery := latest.After(types.GetEffectiveGenesis()) - - t.trtl = newTurtle( - t.logger, - cdb, - beacons, - t.cfg, - ) - if needsRecovery { - t.logger.With().Info("loading state from disk. make sure to wait until tortoise is ready", - log.Stringer("last layer", latest), - ) - for lid := types.GetEffectiveGenesis().Add(1); !lid.After(latest); lid = lid.Add(1) { - err := t.trtl.onLayer(context.Background(), lid) - if err != nil { - return nil, err - } - } - } else { - t.logger.Info("no state on disk. initialized with genesis") - } + t.trtl = newTurtle(t.logger, t.cfg) return t, nil } @@ -136,6 +105,41 @@ func (t *Tortoise) Updates() map[types.LayerID]map[types.BlockID]bool { return res } +func (t *Tortoise) OnWeakCoin(lid types.LayerID, coin bool) { + t.mu.Lock() + defer t.mu.Unlock() + t.logger.With().Debug("on weakcoin", + log.Uint32("layer_id", lid.Uint32()), + log.Uint32("evicted", t.trtl.evicted.Uint32()), + log.Bool("coin", coin), + ) + if lid <= t.trtl.evicted { + return + } + layer := t.trtl.layer(lid) + if coin { + layer.coinflip = support + } else { + layer.coinflip = against + } +} + +func (t *Tortoise) OnBeacon(eid types.EpochID, beacon types.Beacon) { + t.mu.Lock() + defer t.mu.Unlock() + evicted := t.trtl.evicted.GetEpoch() + t.logger.With().Debug("on beacon", + log.Uint32("epoch_id", eid.Uint32()), + log.Uint32("evicted", evicted.Uint32()), + log.Stringer("beacon", beacon), + ) + if eid <= evicted { + return + } + epoch := t.trtl.epoch(eid) + epoch.beacon = &beacon +} + type encodeConf struct { current *types.LayerID } @@ -181,10 +185,7 @@ func (t *Tortoise) TallyVotes(ctx context.Context, lid types.LayerID) { defer t.mu.Unlock() waitTallyVotes.Observe(float64(time.Since(start).Nanoseconds())) start = time.Now() - if err := t.trtl.onLayer(ctx, lid); err != nil { - errorsCounter.Inc() - t.logger.With().Error("failed on layer", lid, log.Err(err)) - } + t.trtl.onLayer(ctx, lid) executeTallyVotes.Observe(float64(time.Since(start).Nanoseconds())) } @@ -197,17 +198,28 @@ func (t *Tortoise) OnAtx(atx *types.ActivationTxHeader) { t.trtl.onAtx(atx) } -// OnBlock should be called every time new block is received. -func (t *Tortoise) OnBlock(block *types.Block) { +// OnBlock updates tortoise with information that data is available locally. +func (t *Tortoise) OnBlock(header types.BlockHeader) { + start := time.Now() + t.mu.Lock() + defer t.mu.Unlock() + waitBlockDuration.Observe(float64(time.Since(start).Nanoseconds())) + t.trtl.onBlock(header, true, false) +} + +// OnValidBlock inserts block, updates that data is stored locally +// and that block was previously considered valid by tortoise. +func (t *Tortoise) OnValidBlock(header types.BlockHeader) { start := time.Now() t.mu.Lock() defer t.mu.Unlock() waitBlockDuration.Observe(float64(time.Since(start).Nanoseconds())) - t.trtl.onBlock(block.ToVote()) + t.trtl.onBlock(header, true, true) } // OnBallot should be called every time new ballot is received. -// BaseBallot and RefBallot must be always processed first. And ATX must be stored in the database. +// Dependencies (base ballot, ref ballot, active set and its own atx) must +// be processed before ballot. func (t *Tortoise) OnBallot(ballot *types.Ballot) { t.mu.Lock() defer t.mu.Unlock() diff --git a/tortoise/full_test.go b/tortoise/full_test.go index 0535e7ec48..457709810d 100644 --- a/tortoise/full_test.go +++ b/tortoise/full_test.go @@ -3,17 +3,13 @@ package tortoise import ( "math/rand" "testing" - "time" "github.com/spacemeshos/fixed" "github.com/stretchr/testify/require" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/signing" - "github.com/spacemeshos/go-spacemesh/sql" - "github.com/spacemeshos/go-spacemesh/sql/atxs" ) func TestFullBallotFilter(t *testing.T) { @@ -326,25 +322,22 @@ func TestFullCountVotes(t *testing.T) { tc := tc t.Run(tc.desc, func(t *testing.T) { logger := logtest.New(t) - cdb := datastore.NewCachedDB(sql.InMemory(), logger) + tortoise := defaultAlgorithm(t) var activeset []types.ATXID for i := range tc.activeset { - atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{ - NIPostChallenge: types.NIPostChallenge{}, - NumUnits: 1, - }} atxid := types.ATXID{byte(i + 1)} - atx.SetID(atxid) - atx.SetEffectiveNumUnits(atx.NumUnits) - atx.SetReceived(time.Now()) - vAtx, err := atx.Verify(tc.activeset[i].BaseHeight, tc.activeset[i].TickCount) - require.NoError(t, err) - require.NoError(t, atxs.Add(cdb, vAtx)) + header := &types.ActivationTxHeader{ + ID: atxid, + NumUnits: 1, + EffectiveNumUnits: 1, + BaseTickHeight: tc.activeset[i].BaseHeight, + TickCount: tc.activeset[i].TickCount, + } + header.PublishEpoch = 1 + tortoise.OnAtx(header) activeset = append(activeset, atxid) } - tortoise := defaultAlgorithm(t, cdb) - tortoise.trtl.cdb = cdb consensus := tortoise.trtl consensus.ballotRefs[types.EmptyBallotID] = &ballotInfo{ layer: genesis, @@ -364,11 +357,9 @@ func TestFullCountVotes(t *testing.T) { layerBlocks = append(layerBlocks, b) refs[b.ID()] = b.ToVote() } - consensus.epochs[lid.GetEpoch()] = &epochInfo{ - height: localHeight, - } + consensus.epoch(lid.GetEpoch()).height = localHeight for _, block := range layerBlocks { - consensus.onBlock(block.ToVote()) + tortoise.OnBlock(block.ToVote()) } blocks = append(blocks, layerBlocks) } @@ -381,7 +372,7 @@ func TestFullCountVotes(t *testing.T) { ballot := &types.Ballot{} ballot.EligibilityProofs = []types.VotingEligibility{{J: uint32(j)}} ballot.AtxID = activeset[b.ATX] - ballot.EpochData = &types.EpochData{ActiveSetHash: types.Hash32{1, 2, 3}} + ballot.EpochData = &types.EpochData{ActiveSetHash: types.Hash32{1, 2, 3}, EligibilityCount: 1} ballot.ActiveSet = activeset ballot.Layer = lid // don't vote on genesis for simplicity, diff --git a/tortoise/model/core.go b/tortoise/model/core.go index bba71d9a46..606e0075fe 100644 --- a/tortoise/model/core.go +++ b/tortoise/model/core.go @@ -42,7 +42,7 @@ func newCore(rng *rand.Rand, id string, logger log.Log) *core { } cfg := tortoise.DefaultConfig() cfg.LayerSize = layerSize - c.tortoise, err = tortoise.New(c.cdb, c.beacons, + c.tortoise, err = tortoise.New( tortoise.WithLogger(logger.Named("trtl")), tortoise.WithConfig(cfg), ) @@ -133,7 +133,7 @@ func (c *core) OnMessage(m Messenger, event Message) { m.Send(MessageBallot{Ballot: ballot}) case MessageLayerEnd: if ev.LayerID.After(types.GetEffectiveGenesis()) { - c.tortoise.TallyVotes(context.TODO(), ev.LayerID) + tortoise.RecoverLayer(context.Background(), c.tortoise, c.cdb, c.beacons, ev.LayerID) m.Notify(EventVerified{ID: c.id, Verified: c.tortoise.LatestComplete(), Layer: ev.LayerID}) } diff --git a/tortoise/recover.go b/tortoise/recover.go new file mode 100644 index 0000000000..0a93dbfcb4 --- /dev/null +++ b/tortoise/recover.go @@ -0,0 +1,93 @@ +package tortoise + +import ( + "context" + "errors" + "fmt" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/datastore" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/ballots" + "github.com/spacemeshos/go-spacemesh/sql/blocks" + "github.com/spacemeshos/go-spacemesh/sql/certificates" + "github.com/spacemeshos/go-spacemesh/sql/layers" + "github.com/spacemeshos/go-spacemesh/system" +) + +// Recover tortoise state from database. +func Recover(db *datastore.CachedDB, beacon system.BeaconGetter, opts ...Opt) (*Tortoise, error) { + trtl, err := New(opts...) + if err != nil { + return nil, err + } + latest, err := ballots.LatestLayer(db) + if err != nil { + return nil, fmt.Errorf("failed to load latest known layer: %v", err) + } + if latest <= types.GetEffectiveGenesis() { + return trtl, nil + } + for lid := types.GetEffectiveGenesis().Add(1); !lid.After(latest); lid = lid.Add(1) { + if err := RecoverLayer(context.Background(), trtl, db, beacon, lid); err != nil { + return nil, fmt.Errorf("failed to load tortoise state at layer %d: %w", lid, err) + } + } + return trtl, nil +} + +func RecoverLayer(ctx context.Context, trtl *Tortoise, db *datastore.CachedDB, beacon system.BeaconGetter, lid types.LayerID) error { + if lid.FirstInEpoch() { + if err := db.IterateEpochATXHeaders(lid.GetEpoch(), func(header *types.ActivationTxHeader) bool { + trtl.OnAtx(header) + return true + }); err != nil { + return err + } + beacon, err := beacon.GetBeacon(lid.GetEpoch()) + if err != nil && !errors.Is(err, sql.ErrNotFound) { + return err + } + if err == nil { + trtl.OnBeacon(lid.GetEpoch(), beacon) + } + } + blocksrst, err := blocks.Layer(db, lid) + if err != nil { + return err + } + for _, block := range blocksrst { + valid, err := blocks.IsValid(db, block.ID()) + if err != nil && errors.Is(err, sql.ErrNotFound) { + return err + } + if valid { + trtl.OnValidBlock(block.ToVote()) + } else { + trtl.OnBlock(block.ToVote()) + } + hare, err := certificates.GetHareOutput(db, lid) + if err != nil && !errors.Is(err, sql.ErrNotFound) { + return err + } + if err == nil { + trtl.OnHareOutput(lid, hare) + } + } + ballotsrst, err := ballots.Layer(db, lid) + if err != nil { + return err + } + for _, ballot := range ballotsrst { + trtl.OnBallot(ballot) + } + coin, err := layers.GetWeakCoin(db, lid) + if err != nil && !errors.Is(err, sql.ErrNotFound) { + return err + } + if err == nil { + trtl.OnWeakCoin(lid, coin) + } + trtl.TallyVotes(ctx, lid) + return nil +} diff --git a/tortoise/recover_test.go b/tortoise/recover_test.go new file mode 100644 index 0000000000..673037716a --- /dev/null +++ b/tortoise/recover_test.go @@ -0,0 +1,32 @@ +package tortoise + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/datastore" + "github.com/spacemeshos/go-spacemesh/system" +) + +type recoveryAdapter struct { + testing.TB + *Tortoise + db *datastore.CachedDB + beacon system.BeaconGetter + + prev types.LayerID +} + +func (a *recoveryAdapter) TallyVotes(ctx context.Context, current types.LayerID) { + genesis := types.GetEffectiveGenesis() + if a.prev == 0 { + a.prev = genesis + } + for lid := a.prev; lid <= current; lid++ { + require.NoError(a, RecoverLayer(ctx, a.Tortoise, a.db, a.beacon, lid)) + a.prev = lid + } +} diff --git a/tortoise/state.go b/tortoise/state.go index abc9b94e4a..fc4fe927bb 100644 --- a/tortoise/state.go +++ b/tortoise/state.go @@ -25,12 +25,18 @@ type ( referenceHeight uint64 } + atxInfo struct { + weight uint64 + height uint64 + } + epochInfo struct { - atxs map[types.ATXID]uint64 + atxs map[types.ATXID]atxInfo // weight is a sum of all atxs weight weight // median height from atxs height uint64 + beacon *types.Beacon } state struct { @@ -97,7 +103,7 @@ func (s *state) epoch(eid types.EpochID) *epochInfo { epoch, exist := s.epochs[eid] if !exist { epochsNumber.Inc() - epoch = &epochInfo{atxs: map[types.ATXID]uint64{}} + epoch = &epochInfo{atxs: map[types.ATXID]atxInfo{}} s.epochs[eid] = epoch } return epoch @@ -161,6 +167,7 @@ type layerInfo struct { hareTerminated bool blocks []*blockInfo verifying verifyingInfo + coinflip sign opinion types.Hash32 // a pointer to the value stored on the previous layerInfo object @@ -480,3 +487,15 @@ func decodeVotes(evicted types.LayerID, blid types.LayerID, base *ballotInfo, ex } return decoded, from, nil } + +func activeSetWeight(epoch *epochInfo, aset []types.ATXID) (uint64, error) { + var weight uint64 + for _, id := range aset { + atx, exists := epoch.atxs[id] + if !exists { + return 0, fmt.Errorf("atx %v is not in state", id) + } + weight += atx.weight + } + return weight, nil +} diff --git a/tortoise/threshold.go b/tortoise/threshold.go index 95e63170b0..5e5adc271b 100644 --- a/tortoise/threshold.go +++ b/tortoise/threshold.go @@ -1,13 +1,11 @@ package tortoise import ( - "fmt" "sort" "github.com/spacemeshos/fixed" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/datastore" ) const ( @@ -20,14 +18,6 @@ const ( // cancels their weight (adversarialWeightFraction) - honest nodes should still cross local threshold. ) -func getBallotHeight(cdb *datastore.CachedDB, ballot *types.Ballot) (uint64, error) { - atx, err := cdb.GetAtxHeader(ballot.AtxID) - if err != nil { - return 0, fmt.Errorf("read atx for ballot height: %w", err) - } - return atx.TickHeight(), nil -} - func getMedian(heights []uint64) uint64 { if len(heights) == 0 { return 0 diff --git a/tortoise/tortoise.go b/tortoise/tortoise.go index 4f34f56a4d..5f63e91117 100644 --- a/tortoise/tortoise.go +++ b/tortoise/tortoise.go @@ -6,20 +6,14 @@ import ( "errors" "fmt" "math" + "math/big" "time" "github.com/spacemeshos/fixed" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/log" - putil "github.com/spacemeshos/go-spacemesh/proposals/util" - "github.com/spacemeshos/go-spacemesh/sql" - "github.com/spacemeshos/go-spacemesh/sql/ballots" - "github.com/spacemeshos/go-spacemesh/sql/blocks" - "github.com/spacemeshos/go-spacemesh/sql/certificates" - "github.com/spacemeshos/go-spacemesh/sql/layers" - "github.com/spacemeshos/go-spacemesh/system" + "github.com/spacemeshos/go-spacemesh/proposals/util" "github.com/spacemeshos/go-spacemesh/tortoise/metrics" ) @@ -28,9 +22,7 @@ var errBeaconUnavailable = errors.New("beacon unavailable") type turtle struct { Config logger log.Log - cdb *datastore.CachedDB - beacons system.BeaconGetter updated map[types.LayerID]map[types.BlockID]bool *state @@ -48,18 +40,11 @@ type turtle struct { } // newTurtle creates a new verifying tortoise algorithm instance. -func newTurtle( - logger log.Log, - cdb *datastore.CachedDB, - beacons system.BeaconGetter, - config Config, -) *turtle { +func newTurtle(logger log.Log, config Config) *turtle { t := &turtle{ - Config: config, - state: newState(), - logger: logger, - cdb: cdb, - beacons: beacons, + Config: config, + state: newState(), + logger: logger, } genesis := types.GetEffectiveGenesis() @@ -68,7 +53,7 @@ func newTurtle( t.verified = genesis t.evicted = genesis.Sub(1) - t.epochs[genesis.GetEpoch()] = &epochInfo{atxs: map[types.ATXID]uint64{}} + t.epochs[genesis.GetEpoch()] = &epochInfo{atxs: map[types.ATXID]atxInfo{}} t.layers[genesis] = &layerInfo{ lid: genesis, hareTerminated: true, @@ -293,18 +278,15 @@ func (t *turtle) getFullVote(verified, current types.LayerID, block *blockInfo) if vote != abstain { return vote, reasonLocalThreshold, nil } - coin, err := layers.GetWeakCoin(t.cdb, current.Sub(1)) - if err != nil { + layer := t.layer(current.Sub(1)) + if layer.coinflip == neutral { return 0, "", fmt.Errorf("coinflip is not recorded in %s. required for vote on %s / %s", current.Sub(1), block.id, block.layer) } - if coin { - return support, reasonCoinflip, nil - } - return against, reasonCoinflip, nil + return layer.coinflip, reasonCoinflip, nil } -func (t *turtle) onLayer(ctx context.Context, last types.LayerID) error { +func (t *turtle) onLayer(ctx context.Context, last types.LayerID) { t.logger.With().Debug("on layer", last) defer t.evict(ctx) if last.After(t.last) { @@ -312,13 +294,11 @@ func (t *turtle) onLayer(ctx context.Context, last types.LayerID) error { lastLayer.Set(float64(t.last)) } if err := t.drainRetriable(); err != nil { - return nil + return } for process := t.processed.Add(1); !process.After(t.last); process = process.Add(1) { if process.FirstInEpoch() { - if err := t.loadAtxs(process.GetEpoch()); err != nil { - return err - } + t.computeEpochHeight(process.GetEpoch()) } layer := t.layer(process) for _, block := range layer.blocks { @@ -338,18 +318,11 @@ func (t *turtle) onLayer(ctx context.Context, last types.LayerID) error { if errors.Is(err, errBeaconUnavailable) { t.retryLater(ballot) } else { - return err + panic(err) } } } - if err := t.loadBlocksData(process); err != nil { - return err - } - if err := t.loadBallots(process); err != nil { - return err - } - layer.prevOpinion = &prev.opinion layer.computeOpinion(t.Hdist, t.last) t.logger.With().Debug("initial local opinion", @@ -366,7 +339,6 @@ func (t *turtle) onLayer(ctx context.Context, last types.LayerID) error { } } t.verifyLayers() - return nil } func (t *turtle) switchModes(logger log.Log) { @@ -470,94 +442,33 @@ func (t *turtle) verifyLayers() { verifiedLayer.Set(float64(t.verified)) } -// loadBlocksData loads blocks, hare output and contextual validity. -func (t *turtle) loadBlocksData(lid types.LayerID) error { - blocks, err := blocks.Layer(t.cdb, lid) - if err != nil { - return fmt.Errorf("read blocks for layer %s: %w", lid, err) - } - for _, block := range blocks { - t.onBlock(block.ToVote()) - } - if err := t.loadHare(lid); err != nil { - return err - } - return t.loadContextualValidity(lid) -} - -func (t *turtle) loadHare(lid types.LayerID) error { - output, err := certificates.GetHareOutput(t.cdb, lid) - if err == nil { - t.onHareOutput(lid, output) - return nil - } - if errors.Is(err, sql.ErrNotFound) { - t.logger.With().Debug("hare output for layer is not found", lid) - return nil - } - return fmt.Errorf("get hare output %s: %w", lid, err) -} - -func (t *turtle) loadContextualValidity(lid types.LayerID) error { - // validities will be available only during rerun or - // if they are synced from peers - for _, block := range t.layer(lid).blocks { - valid, err := blocks.IsValid(t.cdb, block.id) - if err != nil { - if !errors.Is(err, blocks.ErrValidityNotDecided) { - return err - } - } else if valid { - block.validity = support - } else { - block.validity = against - } - } - return nil -} - -// loadAtxs and compute reference height. -func (t *turtle) loadAtxs(epoch types.EpochID) error { - var heights []uint64 - if err := t.cdb.IterateEpochATXHeaders(epoch, func(header *types.ActivationTxHeader) bool { - t.onAtx(header) - heights = append(heights, header.TickHeight()) - return true - }); err != nil { - return fmt.Errorf("computing epoch data for %d: %w", epoch, err) - } +func (t *turtle) computeEpochHeight(epoch types.EpochID) { einfo := t.epoch(epoch) - einfo.height = getMedian(heights) - return nil -} - -func (t *turtle) loadBallots(lid types.LayerID) error { - blts, err := ballots.Layer(t.cdb, lid) - if err != nil { - return fmt.Errorf("read ballots for layer %s: %w", lid, err) + heights := make([]uint64, 0, len(einfo.atxs)) + for _, info := range einfo.atxs { + heights = append(heights, info.height) } - - for _, ballot := range blts { - if err := t.onBallot(ballot); err != nil { - t.logger.With().Error("failed to add ballot to the state", log.Err(err), log.Inline(ballot)) - } - } - return nil + einfo.height = getMedian(heights) } -func (t *turtle) onBlock(header types.Vote) { +func (t *turtle) onBlock(header types.BlockHeader, data bool, valid bool) { if header.LayerID <= t.evicted { return } - if binfo := t.state.getBlock(header); binfo != nil { - binfo.data = true + binfo.data = data + if valid { + binfo.validity = support + } return } t.logger.With().Debug("on data block", log.Inline(&header)) binfo := newBlockInfo(header) - binfo.data = true + binfo.data = data + if valid { + binfo.validity = support + } t.addBlock(binfo) } @@ -630,7 +541,7 @@ func (t *turtle) onAtx(atx *types.ActivationTxHeader) { log.Uint32("epoch", uint32(atx.TargetEpoch())), log.Uint64("weight", atx.GetWeight()), ) - epoch.atxs[atx.ID] = atx.GetWeight() + epoch.atxs[atx.ID] = atxInfo{weight: atx.GetWeight(), height: atx.TickHeight()} if atx.GetWeight() > math.MaxInt64 { // atx weight is not expected to overflow int64 t.logger.With().Fatal("fixme: atx size overflows int64", log.Uint64("weight", atx.GetWeight())) @@ -682,19 +593,23 @@ func (t *turtle) decodeBallot(ballot *types.Ballot) (*ballotInfo, types.LayerID, } if ballot.EpochData != nil { - beacon := ballot.EpochData.Beacon - height, err := getBallotHeight(t.cdb, ballot) + epoch := t.epoch(ballot.Layer.GetEpoch()) + atx, exists := epoch.atxs[ballot.AtxID] + if !exists { + return nil, 0, fmt.Errorf("atx %s/%d not in state", ballot.AtxID, ballot.Layer.GetEpoch()) + } + total, err := activeSetWeight(epoch, ballot.ActiveSet) if err != nil { return nil, 0, err } - refweight, err := putil.ComputeWeightPerEligibility(t.cdb, ballot, t.LayerSize, types.GetLayersPerEpoch()) + expected, err := util.GetNumEligibleSlots(atx.weight, total, t.LayerSize, types.GetLayersPerEpoch()) if err != nil { return nil, 0, err } refinfo = &referenceInfo{ - height: height, - beacon: beacon, - weight: refweight, + height: atx.height, + beacon: ballot.EpochData.Beacon, + weight: big.NewRat(int64(atx.weight), int64(expected)), } } else { ref, exists := t.state.ballotRefs[ballot.RefBallot] @@ -786,17 +701,18 @@ func (t *turtle) onBallot(ballot *types.Ballot) error { return nil } -func (t *turtle) compareBeacons(logger log.Log, bid types.BallotID, layerID types.LayerID, beacon types.Beacon) (bool, error) { - epochBeacon, err := t.beacons.GetBeacon(layerID.GetEpoch()) - if err != nil { - return false, err +func (t *turtle) compareBeacons(logger log.Log, bid types.BallotID, lid types.LayerID, beacon types.Beacon) (bool, error) { + epoch := t.epoch(lid.GetEpoch()) + if epoch.beacon == nil { + return false, errBeaconUnavailable } - if beacon != epochBeacon { + if beacon != *epoch.beacon { logger.With().Debug("ballot has different beacon", - layerID, - bid, - log.String("ballot_beacon", beacon.ShortString()), - log.String("epoch_beacon", epochBeacon.ShortString())) + log.Uint32("layer_id", lid.Uint32()), + log.Stringer("block", bid), + log.ShortStringer("ballot_beacon", beacon), + log.ShortStringer("epoch_beacon", epoch.beacon), + ) return true, nil } return false, nil diff --git a/tortoise/tortoise_test.go b/tortoise/tortoise_test.go index 5fb5dc3259..218e586b37 100644 --- a/tortoise/tortoise_test.go +++ b/tortoise/tortoise_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/spacemeshos/fixed" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,7 +16,6 @@ import ( "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/datastore" - "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" @@ -27,8 +25,6 @@ import ( "github.com/spacemeshos/go-spacemesh/sql/certificates" "github.com/spacemeshos/go-spacemesh/sql/identities" "github.com/spacemeshos/go-spacemesh/sql/layers" - "github.com/spacemeshos/go-spacemesh/system" - smocks "github.com/spacemeshos/go-spacemesh/system/mocks" "github.com/spacemeshos/go-spacemesh/tortoise/opinionhash" "github.com/spacemeshos/go-spacemesh/tortoise/sim" ) @@ -40,10 +36,6 @@ func TestMain(m *testing.M) { os.Exit(res) } -func newCachedDB(t *testing.T, logger log.Log) *datastore.CachedDB { - return datastore.NewCachedDB(sql.InMemory(), logger) -} - const ( defaultTestLayerSize = 3 defaultTestWindowSize = 30 @@ -215,7 +207,7 @@ func TestAbstainLateBlock(t *testing.T) { block := types.Block{} block.LayerIndex = last.Sub(1) block.Initialize() - tortoise.OnBlock(&block) + tortoise.OnBlock(block.ToVote()) tortoise.OnHareOutput(block.LayerIndex, block.ID()) tortoise.TallyVotes(ctx, last) @@ -309,26 +301,6 @@ func TestEncodeAbstainVotesDelayedHare(t *testing.T) { require.Equal(t, votes.Abstain, []types.LayerID{types.LayerID(9)}) } -func mockedBeacons(tb testing.TB) system.BeaconGetter { - tb.Helper() - - ctrl := gomock.NewController(tb) - mockBeacons := smocks.NewMockBeaconGetter(ctrl) - mockBeacons.EXPECT().GetBeacon(gomock.Any()).Return(types.EmptyBeacon, nil).AnyTimes() - return mockBeacons -} - -func defaultTurtle(tb testing.TB) *turtle { - lg := logtest.New(tb) - cdb := datastore.NewCachedDB(sql.InMemory(), lg) - return newTurtle( - lg, - cdb, - mockedBeacons(tb), - defaultTestConfig(), - ) -} - func defaultTestConfig() Config { return Config{ LayerSize: defaultTestLayerSize, @@ -340,15 +312,20 @@ func defaultTestConfig() Config { } } -func tortoiseFromSimState(tb testing.TB, state sim.State, opts ...Opt) *Tortoise { - trtl, err := New(state.DB, state.Beacons, opts...) +func tortoiseFromSimState(tb testing.TB, state sim.State, opts ...Opt) *recoveryAdapter { + trtl, err := New(opts...) require.NoError(tb, err) - return trtl + return &recoveryAdapter{ + TB: tb, + Tortoise: trtl, + db: state.DB, + beacon: state.Beacons, + } } -func defaultAlgorithm(tb testing.TB, cdb *datastore.CachedDB) *Tortoise { +func defaultAlgorithm(tb testing.TB) *Tortoise { tb.Helper() - trtl, err := New(cdb, mockedBeacons(tb), + trtl, err := New( WithConfig(defaultTestConfig()), WithLogger(logtest.New(tb)), ) @@ -542,7 +519,11 @@ func TestOutOfOrderLayersAreVerified(t *testing.T) { require.Equal(t, last.Sub(1), verified) } -func processBlockUpdates(tb testing.TB, tt *Tortoise, db sql.Executor) { +type updater interface { + Updates() map[types.LayerID]map[types.BlockID]bool +} + +func processBlockUpdates(tb testing.TB, tt updater, db sql.Executor) { updated := tt.Updates() for _, bids := range updated { for bid, valid := range bids { @@ -793,22 +774,18 @@ func TestBallotHasGoodBeacon(t *testing.T) { epochBeacon := types.RandomBeacon() ballot := randomRefBallot(t, layerID, epochBeacon) - mockBeacons := smocks.NewMockBeaconGetter(gomock.NewController(t)) - trtl := defaultTurtle(t) - trtl.beacons = mockBeacons + trtl := defaultAlgorithm(t) logger := logtest.New(t) - // good beacon - mockBeacons.EXPECT().GetBeacon(layerID.GetEpoch()).Return(epochBeacon, nil).Times(1) - badBeacon, err := trtl.compareBeacons(logger, ballot.ID(), ballot.Layer, epochBeacon) + trtl.OnBeacon(layerID.GetEpoch(), epochBeacon) + badBeacon, err := trtl.trtl.compareBeacons(logger, ballot.ID(), ballot.Layer, epochBeacon) assert.NoError(t, err) assert.False(t, badBeacon) // bad beacon beacon := types.RandomBeacon() require.NotEqual(t, epochBeacon, beacon) - mockBeacons.EXPECT().GetBeacon(layerID.GetEpoch()).Return(epochBeacon, nil).Times(1) - badBeacon, err = trtl.compareBeacons(logger, ballot.ID(), ballot.Layer, beacon) + badBeacon, err = trtl.trtl.compareBeacons(logger, ballot.ID(), ballot.Layer, beacon) assert.NoError(t, err) assert.True(t, badBeacon) } @@ -983,9 +960,13 @@ func outOfWindowBaseBallot(n, window int) sim.VotesGenerator { } } +type voter interface { + EncodeVotes(ctx context.Context, opts ...EncodeVotesOpts) (*types.Opinion, error) +} + // tortoiseVoting is for testing that protocol makes progress using heuristic that we are // using for the network. -func tortoiseVoting(tortoise *Tortoise) sim.VotesGenerator { +func tortoiseVoting(tortoise voter) sim.VotesGenerator { return func(rng *rand.Rand, layers []*types.Layer, i int) sim.Voting { votes, err := tortoise.EncodeVotes(context.Background()) if err != nil { @@ -995,7 +976,7 @@ func tortoiseVoting(tortoise *Tortoise) sim.VotesGenerator { } } -func tortoiseVotingWithCurrent(tortoise *Tortoise) sim.VotesGenerator { +func tortoiseVotingWithCurrent(tortoise voter) sim.VotesGenerator { return func(rng *rand.Rand, layers []*types.Layer, i int) sim.Voting { current := types.GetEffectiveGenesis().Add(1) if len(layers) > 0 { @@ -1388,11 +1369,11 @@ func TestComputeLocalOpinion(t *testing.T) { tortoise.TallyVotes(ctx, lid) } - err := tortoise.trtl.loadBlocksData(tc.lid) - require.NoError(t, err) - blks, err := blocks.Layer(s.GetState(0).DB, tc.lid) require.NoError(t, err) + for _, block := range blks { + tortoise.OnBlock(block.ToVote()) + } for _, block := range blks { header := block.ToVote() vote, _ := getLocalVote( @@ -1513,24 +1494,22 @@ func TestComputeBallotWeight(t *testing.T) { atxids []types.ATXID ) - cdb := newCachedDB(t, logtest.New(t)) cfg := DefaultConfig() cfg.LayerSize = tc.layerSize - trtl, err := New(cdb, nil, WithLogger(logtest.New(t)), WithConfig(cfg)) + trtl, err := New(WithLogger(logtest.New(t)), WithConfig(cfg)) require.NoError(t, err) lid := types.LayerID(111) for _, weight := range tc.atxs { - atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{ - NumUnits: uint32(weight), - }} - atx.PublishEpoch = lid.GetEpoch() - 1 atxID := types.RandomATXID() - atx.SetID(atxID) - atx.SetEffectiveNumUnits(atx.NumUnits) - atx.SetReceived(time.Now()) - vAtx, err := atx.Verify(0, 1) - require.NoError(t, err) - require.NoError(t, atxs.Add(cdb, vAtx)) + header := &types.ActivationTxHeader{ + NumUnits: uint32(weight), + ID: atxID, + EffectiveNumUnits: uint32(weight), + } + header.PublishEpoch = lid.GetEpoch() - 1 + header.BaseTickHeight = 0 + header.TickCount = 1 + trtl.OnAtx(header) atxids = append(atxids, atxID) } @@ -1636,8 +1615,8 @@ func TestNetworkRecoversFromFullPartition(t *testing.T) { mergedBlocks, err := blocks.Layer(s1.GetState(0).DB, lid) require.NoError(t, err) for _, block := range mergedBlocks { - tortoise1.OnBlock(block) - tortoise2.OnBlock(block) + tortoise1.OnBlock(block.ToVote()) + tortoise2.OnBlock(block.ToVote()) } mergedBallots, err := ballots.Layer(s1.GetState(0).DB, lid) require.NoError(t, err) @@ -1858,7 +1837,7 @@ func TestLateBlock(t *testing.T) { require.True(t, len(block.TxIDs) > 2) block.TxIDs = block.TxIDs[:2] block.Initialize() - tortoise.OnBlock(&block) + tortoise.OnBlock(block.ToVote()) require.NoError(t, blocks.Add(s.GetState(0).DB, &block)) for _, last = range sim.GenLayers(s, @@ -2631,14 +2610,13 @@ func TestNonTerminatedLayers(t *testing.T) { func TestEncodeVotes(t *testing.T) { ctx := context.Background() t.Run("support", func(t *testing.T) { - cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t)) - tortoise := defaultAlgorithm(t, cdb) + tortoise := defaultAlgorithm(t) block := types.Block{} block.LayerIndex = types.GetEffectiveGenesis().Add(1) block.Initialize() - tortoise.OnBlock(&block) + tortoise.OnBlock(block.ToVote()) tortoise.OnHareOutput(block.LayerIndex, block.ID()) tortoise.TallyVotes(ctx, block.LayerIndex.Add(1)) @@ -2654,8 +2632,7 @@ func TestEncodeVotes(t *testing.T) { require.Equal(t, hasher.Sum(nil), opinion.Hash[:]) }) t.Run("against", func(t *testing.T) { - cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t)) - tortoise := defaultAlgorithm(t, cdb) + tortoise := defaultAlgorithm(t) tortoise.OnHareOutput(types.GetEffectiveGenesis().Add(1), types.EmptyBlockID) current := types.GetEffectiveGenesis().Add(2) @@ -2671,8 +2648,7 @@ func TestEncodeVotes(t *testing.T) { require.Equal(t, hasher.Sum(nil), opinion.Hash[:]) }) t.Run("abstain", func(t *testing.T) { - cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t)) - tortoise := defaultAlgorithm(t, cdb) + tortoise := defaultAlgorithm(t) current := types.GetEffectiveGenesis().Add(2) tortoise.TallyVotes(ctx, current) @@ -2689,11 +2665,10 @@ func TestEncodeVotes(t *testing.T) { require.Equal(t, hasher.Sum(nil), opinion.Hash[:]) }) t.Run("support multiple", func(t *testing.T) { - cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t)) cfg := defaultTestConfig() cfg.Hdist = 1 cfg.Zdist = 1 - tortoise, err := New(cdb, mockedBeacons(t), + tortoise, err := New( WithConfig(cfg), WithLogger(logtest.New(t)), ) @@ -2706,12 +2681,11 @@ func TestEncodeVotes(t *testing.T) { } for _, block := range blks { block.Initialize() - tortoise.OnBlock(block) - require.NoError(t, blocks.Add(cdb, block)) + tortoise.OnBlock(block.ToVote()) } current := lid.Add(2) - require.NoError(t, layers.SetWeakCoin(cdb, current.Sub(1), true)) + tortoise.OnWeakCoin(current.Sub(1), true) tortoise.TallyVotes(ctx, current) opinion, err := tortoise.EncodeVotes(ctx, EncodeVotesWithCurrent(current)) @@ -2732,8 +2706,7 @@ func TestEncodeVotes(t *testing.T) { require.Equal(t, hasher.Sum(nil), opinion.Hash[:]) }) t.Run("rewrite before base", func(t *testing.T) { - cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t)) - tortoise, err := New(cdb, mockedBeacons(t), + tortoise, err := New( WithConfig(defaultTestConfig()), WithLogger(logtest.New(t)), ) @@ -2742,21 +2715,23 @@ func TestEncodeVotes(t *testing.T) { hare := types.GetEffectiveGenesis().Add(1) block := types.Block{InnerBlock: types.InnerBlock{LayerIndex: hare}} block.Initialize() - tortoise.OnBlock(&block) + tortoise.OnBlock(block.ToVote()) tortoise.OnHareOutput(hare, block.ID()) lid := hare.Add(1) ballot := types.Ballot{} atxid := types.ATXID{1} - atx := &types.ActivationTx{} - atx.NumUnits = 10 - atx.SetID(atxid) - atx.SetEffectiveNumUnits(atx.NumUnits) - atx.SetReceived(time.Now()) - vatx, err := atx.Verify(1, 1) - require.NoError(t, err) - require.NoError(t, atxs.Add(cdb, vatx)) + header := &types.ActivationTxHeader{ + NumUnits: 10, + EffectiveNumUnits: 10, + ID: atxid, + BaseTickHeight: 1, + TickCount: 1, + } + header.PublishEpoch = lid.GetEpoch() - 1 + tortoise.OnAtx(header) + tortoise.OnBeacon(lid.GetEpoch(), types.EmptyBeacon) ballot.EpochData = &types.EpochData{ActiveSetHash: types.Hash32{1, 2, 3}} ballot.ActiveSet = []types.ATXID{atxid} @@ -2860,8 +2835,7 @@ func TestBaseBallotBeforeCurrentLayer(t *testing.T) { } func TestMissingActiveSet(t *testing.T) { - cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t)) - tortoise := defaultAlgorithm(t, cdb) + tortoise := defaultAlgorithm(t) epoch := types.EpochID(3) aset := []types.ATXID{ types.ATXID(types.BytesToHash([]byte("first"))), @@ -2999,10 +2973,7 @@ func TestMultipleTargets(t *testing.T) { require.NoError(t, err) require.Len(t, votes.Against, 1) require.Equal(t, votes.Against[0], block.Header) - tortoise.OnBlock(types.NewExistingBlock(block.Header.ID, types.InnerBlock{ - LayerIndex: block.Header.LayerID, - TickHeight: block.Header.Height, - })) + tortoise.OnBlock(block.Header) votes, err = tortoise.EncodeVotes(ctx) require.NoError(t, err) require.Empty(t, votes.Against) diff --git a/tortoise/verifying.go b/tortoise/verifying.go index 0aca28cd4f..6871e7fca0 100644 --- a/tortoise/verifying.go +++ b/tortoise/verifying.go @@ -42,8 +42,8 @@ func (v *verifying) countBallot(logger log.Log, ballot *ballotInfo) { logger.With().Debug("count ballot in verifying mode", ballot.layer, ballot.id, - log.Stringer("ballot opinion", ballot.opinion()), - log.Stringer("local opinion", prev.opinion), + log.ShortStringer("ballot opinion", ballot.opinion()), + log.ShortStringer("local opinion", prev.opinion), log.Bool("bad beacon", ballot.conditions.badBeacon), log.Stringer("weight", ballot.weight), log.Uint64("reference height", prev.verifying.referenceHeight),