Skip to content

Commit

Permalink
data-driven tortoise (spacemeshos#3888)
Browse files Browse the repository at this point in the history
all interactions with tortoise are through the data-driven interface, with the following goals:
- improve reasoning about tortoise dependencies and timings
- predictable performance (for example loading many objects from disk is not predictable)
- enable tracing and further simplify testing

specifically for tracing the plan is to dump all passed data inputs to disk on the subset of nodes in the cloud, and then rerun tortoise in exact observed order. this will be an upcoming change.

other changes:
- state from disk is loaded by using Recover method, same approach is reused for tests as they were writing generated state to disk, and refactoring all of them is a long process
- additionally weak coin and beacon have to be reported externally
  • Loading branch information
dshulyak committed May 9, 2023
1 parent d87d1e9 commit da7d48e
Show file tree
Hide file tree
Showing 27 changed files with 501 additions and 353 deletions.
22 changes: 22 additions & 0 deletions beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spacemeshos/go-spacemesh/beacon/weakcoin"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/common/types/result"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
Expand Down Expand Up @@ -128,6 +129,7 @@ func New(
beacons: make(map[types.EpochID]types.Beacon),
ballotsBeacons: make(map[types.EpochID]map[types.Beacon]*beaconWeight),
states: make(map[types.EpochID]*state),
results: make(chan result.Beacon, 100),
}
for _, opt := range opts {
opt(pd)
Expand Down Expand Up @@ -203,6 +205,7 @@ type ProtocolDriver struct {
// the map key is the epoch when the ballot is published. the beacon value is calculated in the
// previous epoch and used in the current epoch.
ballotsBeacons map[types.EpochID]map[types.Beacon]*beaconWeight
results chan result.Beacon

// metrics
metricsCollector *metrics.BeaconMetricsCollector
Expand Down Expand Up @@ -250,6 +253,7 @@ func (pd *ProtocolDriver) UpdateBeacon(epoch types.EpochID, beacon types.Beacon)
}
pd.beacons[epoch] = beacon
pd.logger.With().Info("using fallback beacon", epoch, beacon)
pd.onResult(epoch, beacon)
return nil
}

Expand All @@ -262,9 +266,26 @@ func (pd *ProtocolDriver) Close() {
if err := pd.eg.Wait(); err != nil {
pd.logger.With().Info("received error waiting for goroutines to finish", log.Err(err))
}
close(pd.results)
pd.logger.Info("beacon goroutines finished")
}

func (pd *ProtocolDriver) onResult(epoch types.EpochID, beacon types.Beacon) {
select {
case pd.results <- result.Beacon{Epoch: epoch, Beacon: beacon}:
default:
pd.logger.With().Error("results queue is congested",
log.Uint32("epoch_id", epoch.Uint32()),
log.Stringer("beacon", beacon),
)
}
}

// Results notifies waiter that beacon for a target epoch has completed.
func (pd *ProtocolDriver) Results() <-chan result.Beacon {
return pd.results
}

// isClosed returns true if the beacon protocol is shutting down.
func (pd *ProtocolDriver) isClosed() bool {
select {
Expand Down Expand Up @@ -478,6 +499,7 @@ func (pd *ProtocolDriver) setBeacon(targetEpoch types.EpochID, beacon types.Beac
return fmt.Errorf("persist beacon: %w", err)
}
pd.beacons[targetEpoch] = beacon
pd.onResult(targetEpoch, beacon)
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions beacon/beacon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/spacemeshos/go-spacemesh/activation"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/common/types/result"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/p2p"
Expand Down Expand Up @@ -267,6 +268,16 @@ func TestBeacon_NoProposals(t *testing.T) {
}
}

func getNoWait(tb testing.TB, results <-chan result.Beacon) result.Beacon {
select {
case rst := <-results:
return rst
default:
}
require.Fail(tb, "beacon is not available")
return result.Beacon{}
}

func TestBeaconNotSynced(t *testing.T) {
tpd := setUpProtocolDriver(t)
tpd.mSync.EXPECT().IsSynced(gomock.Any()).Return(false).AnyTimes()
Expand All @@ -281,6 +292,8 @@ func TestBeaconNotSynced(t *testing.T) {
bootstrap := types.Beacon{1, 2, 3, 4}
require.NoError(t, tpd.UpdateBeacon(types.EpochID(2), bootstrap))
got, err = tpd.GetBeacon(types.EpochID(2))
require.Equal(t, got, getNoWait(t, tpd.Results()).Beacon)

require.NoError(t, err)
require.Equal(t, bootstrap, got)

Expand Down
24 changes: 23 additions & 1 deletion cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/spacemeshos/go-spacemesh/proposals"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/layers"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/system"
Expand Down Expand Up @@ -522,14 +523,21 @@ func (app *App) initServices(
if trtlCfg.BadBeaconVoteDelayLayers == 0 {
trtlCfg.BadBeaconVoteDelayLayers = app.Config.LayersPerEpoch
}
trtl, err := tortoise.New(app.cachedDB, beaconProtocol,
trtl, err := tortoise.Recover(app.cachedDB, beaconProtocol,
tortoise.WithContext(ctx),
tortoise.WithLogger(app.addLogger(TrtlLogger, lg)),
tortoise.WithConfig(trtlCfg),
)
if err != nil {
return fmt.Errorf("can't recover tortoise state: %w", err)
}
app.eg.Go(func() error {
for rst := range beaconProtocol.Results() {
trtl.OnBeacon(rst.Epoch, rst.Beacon)
}
app.log.Debug("beacon results watcher exited")
return nil
})

executor := mesh.NewExecutor(app.cachedDB, state, app.conState, app.addLogger(ExecutorLogger, lg))
msh, err := mesh.NewMesh(app.cachedDB, clock, trtl, executor, app.conState, app.addLogger(MeshLogger, lg))
Expand Down Expand Up @@ -656,6 +664,7 @@ func (app *App) initServices(
patrol,
app.hOracle,
clock,
tortoiseWeakCoin{db: app.cachedDB, tortoise: trtl},
app.addLogger(HareLogger, lg),
)

Expand Down Expand Up @@ -1270,3 +1279,16 @@ func decodeLoggers(cfg config.LoggerConfig) (map[string]string, error) {
}
return rst, nil
}

type tortoiseWeakCoin struct {
db sql.Executor
tortoise system.Tortoise
}

func (w tortoiseWeakCoin) Set(lid types.LayerID, value bool) error {
if err := layers.SetWeakCoin(w.db, lid, value); err != nil {
return err
}
w.tortoise.OnWeakCoin(lid, value)
return nil
}
18 changes: 10 additions & 8 deletions common/types/ballot.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,24 @@ func (v *Votes) MarshalLogObject(encoder log.ObjectEncoder) error {
return nil
}

// Vote additionally carries layer id and height
// in order for the tortoise to count votes without downloading block body.
type Vote struct {
type BlockHeader struct {
ID BlockID
LayerID LayerID
Height uint64
}

// MarshalLogObject implements logging interface.
func (s *Vote) MarshalLogObject(encoder log.ObjectEncoder) error {
encoder.AddString("id", s.ID.String())
encoder.AddUint32("layer", s.LayerID.Uint32())
encoder.AddUint64("height", s.Height)
func (header *BlockHeader) MarshalLogObject(encoder log.ObjectEncoder) error {
encoder.AddString("id", header.ID.String())
encoder.AddUint32("layer", header.LayerID.Uint32())
encoder.AddUint64("height", header.Height)
return nil
}

// Vote additionally carries layer id and height
// in order for the tortoise to count votes without downloading block body.
type Vote = BlockHeader

// Opinion is a tuple from opinion hash and votes that decode to opinion hash.
type Opinion struct {
Hash Hash32
Expand Down Expand Up @@ -291,7 +293,7 @@ func (b *Ballot) MarshalLogObject(encoder log.ObjectEncoder) error {
encoder.AddUint32("layer_id", b.Layer.Uint32())
encoder.AddUint32("epoch_id", uint32(b.Layer.GetEpoch()))
encoder.AddString("smesher", b.SmesherID.String())
encoder.AddString("opinion hash", b.OpinionHash.String())
encoder.AddString("opinion hash", b.OpinionHash.ShortString())
encoder.AddString("base_ballot", b.Votes.Base.String())
encoder.AddInt("support", len(b.Votes.Support))
encoder.AddInt("against", len(b.Votes.Against))
Expand Down
8 changes: 4 additions & 4 deletions common/types/ballot_scale.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions common/types/result/beacon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package result

import "github.com/spacemeshos/go-spacemesh/common/types"

type Beacon struct {
Epoch types.EpochID
Beacon types.Beacon
}
9 changes: 6 additions & 3 deletions hare/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (m *p2pManipulator) Publish(ctx context.Context, protocol string, payload [
type hareWithMocks struct {
*Hare
mockRoracle *mocks.MockRolacle
mockCoin *mocks.MockweakCoin
}

func createTestHare(tb testing.TB, msh mesh, tcfg config.Config, clock *mockClock, p2p pubsub.PublishSubsciber, name string) *hareWithMocks {
Expand All @@ -147,7 +148,7 @@ func createTestHare(tb testing.TB, msh mesh, tcfg config.Config, clock *mockCloc
mockSyncS.EXPECT().IsBeaconSynced(gomock.Any()).Return(true).AnyTimes()

mockRoracle := mocks.NewMockRolacle(ctrl)

mockCoin := mocks.NewMockweakCoin(ctrl)
hare := New(
nil,
tcfg,
Expand All @@ -162,6 +163,7 @@ func createTestHare(tb testing.TB, msh mesh, tcfg config.Config, clock *mockCloc
patrol,
mockStateQ,
clock,
mockCoin,
logtest.New(tb).WithName(name+"_"+signer.PublicKey().ShortString()),
withMesh(msh),
)
Expand All @@ -170,6 +172,7 @@ func createTestHare(tb testing.TB, msh mesh, tcfg config.Config, clock *mockCloc
return &hareWithMocks{
Hare: hare,
mockRoracle: mockRoracle,
mockCoin: mockCoin,
}
}

Expand Down Expand Up @@ -269,7 +272,6 @@ func Test_multipleCPs(t *testing.T) {
mockMesh.EXPECT().GetEpochAtx(gomock.Any(), gomock.Any()).Return(&types.ActivationTxHeader{BaseTickHeight: 11, TickCount: 1}, nil).AnyTimes()
mockMesh.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(0), nil).AnyTimes()
mockMesh.EXPECT().GetMalfeasanceProof(gomock.Any()).AnyTimes()
mockMesh.EXPECT().SetWeakCoin(gomock.Any(), gomock.Any()).AnyTimes()
for lid := types.GetEffectiveGenesis().Add(1); !lid.After(finalLyr); lid = lid.Add(1) {
mockMesh.EXPECT().Proposals(lid).Return(pList[lid], nil)
for _, p := range pList[lid] {
Expand All @@ -294,6 +296,7 @@ func Test_multipleCPs(t *testing.T) {
h.mockRoracle.EXPECT().Proof(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(types.EmptyVrfSignature, nil).AnyTimes()
h.mockRoracle.EXPECT().CalcEligibility(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(uint16(1), nil).AnyTimes()
h.mockRoracle.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
h.mockCoin.EXPECT().Set(gomock.Any(), gomock.Any()).AnyTimes()
outputsWaitGroup.Add(1)
go func(idx int) {
defer outputsWaitGroup.Done()
Expand Down Expand Up @@ -391,7 +394,6 @@ func Test_multipleCPsAndIterations(t *testing.T) {
mockMesh.EXPECT().GetEpochAtx(gomock.Any(), gomock.Any()).Return(&types.ActivationTxHeader{BaseTickHeight: 11, TickCount: 1}, nil).AnyTimes()
mockMesh.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(0), nil).AnyTimes()
mockMesh.EXPECT().GetMalfeasanceProof(gomock.Any()).AnyTimes()
mockMesh.EXPECT().SetWeakCoin(gomock.Any(), gomock.Any()).AnyTimes()
for lid := types.GetEffectiveGenesis().Add(1); !lid.After(finalLyr); lid = lid.Add(1) {
mockMesh.EXPECT().Proposals(lid).Return(pList[lid], nil)
for _, p := range pList[lid] {
Expand All @@ -417,6 +419,7 @@ func Test_multipleCPsAndIterations(t *testing.T) {
h.mockRoracle.EXPECT().Proof(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(types.EmptyVrfSignature, nil).AnyTimes()
h.mockRoracle.EXPECT().CalcEligibility(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(uint16(1), nil).AnyTimes()
h.mockRoracle.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
h.mockCoin.EXPECT().Set(gomock.Any(), gomock.Any()).AnyTimes()
outputsWaitGroup.Add(1)
go func(idx int) {
defer outputsWaitGroup.Done()
Expand Down
10 changes: 4 additions & 6 deletions hare/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/layers"
"github.com/spacemeshos/go-spacemesh/sql/proposals"
"github.com/spacemeshos/go-spacemesh/system"
)
Expand Down Expand Up @@ -88,10 +87,6 @@ func (m defaultMesh) Ballot(bid types.BallotID) (*types.Ballot, error) {
return ballots.Get(m, bid)
}

func (m defaultMesh) SetWeakCoin(lid types.LayerID, wc bool) error {
return layers.SetWeakCoin(m, lid, wc)
}

// Opt for configuring beacon protocol.
type Opt func(*Hare)

Expand All @@ -105,6 +100,7 @@ func withMesh(m mesh) Opt {
type Hare struct {
log.Log
msh mesh
weakCoin weakCoin
config config.Config
publisher pubsub.Publisher
layerClock LayerClock
Expand Down Expand Up @@ -152,6 +148,7 @@ func New(
patrol layerPatrol,
stateQ stateQuerier,
layerClock LayerClock,
weakCoin weakCoin,
logger log.Log,
opts ...Opt,
) *Hare {
Expand Down Expand Up @@ -181,6 +178,7 @@ func New(
h.beacons = beacons
h.rolacle = rolacle
h.patrol = patrol
h.weakCoin = weakCoin

h.networkDelta = conf.WakeupDelta
h.outputChan = make(chan TerminationOutput, h.config.Hdist)
Expand Down Expand Up @@ -530,7 +528,7 @@ func (h *Hare) outputCollectionLoop(ctx context.Context) {
// collect coinflip, regardless of success
logger.With().Debug("recording weak coin result for layer",
log.Bool("weak_coin", coin))
if err := h.msh.SetWeakCoin(layerID, coin); err != nil {
if err := h.weakCoin.Set(layerID, coin); err != nil {
logger.With().Error("failed to set weak coin for layer", log.Err(err))
}
if err := h.collectOutput(ctx, out); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion hare/hare_rounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func runNodesFor(t *testing.T, ctx context.Context, nodes, leaders, maxLayers, l
mockMesh.EXPECT().GetEpochAtx(gomock.Any(), gomock.Any()).Return(&types.ActivationTxHeader{BaseTickHeight: 11, TickCount: 1}, nil).AnyTimes()
mockMesh.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(0), nil).AnyTimes()
mockMesh.EXPECT().GetMalfeasanceProof(gomock.Any()).Return(nil, nil).AnyTimes()
mockMesh.EXPECT().SetWeakCoin(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

for i := 0; i < nodes; i++ {
host := mesh.Hosts()[i]
Expand All @@ -105,6 +104,7 @@ func runNodesFor(t *testing.T, ctx context.Context, nodes, leaders, maxLayers, l
return oracle(layer, round, committeeSize, id, sig, th)
}).AnyTimes()
th.mockRoracle.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
th.mockCoin.EXPECT().Set(gomock.Any(), gomock.Any()).AnyTimes()
go func() {
for out := range th.blockGenCh {
validate(out.Layer, th)
Expand Down
Loading

0 comments on commit da7d48e

Please sign in to comment.