Skip to content

Commit

Permalink
Simplify fcu 1 (#13387)
Browse files Browse the repository at this point in the history
* Remove unsafe proposer indices cache

* Simplify FCU #1

This PR starts the process of gradually simplifying FCU
It removes the responsibility of getting the state and block from this
function and informing if head has changed. It is only called when the
imported block has actually become head.

* Add a call to FCU in edge cases
  • Loading branch information
potuz authored Dec 30, 2023
1 parent cff5e2b commit 9809f5a
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 90 deletions.
44 changes: 19 additions & 25 deletions beacon-chain/blockchain/forkchoice_update_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,50 +44,44 @@ func (s *Service) getStateAndBlock(ctx context.Context, r [32]byte) (state.Beaco
return headState, newHeadBlock, nil
}

type fcuConfig struct {
headState state.BeaconState
headBlock interfaces.ReadOnlySignedBeaconBlock
headRoot [32]byte
proposingSlot primitives.Slot
}

// fockchoiceUpdateWithExecution is a wrapper around notifyForkchoiceUpdate. It decides whether a new call to FCU should be made.
// it returns true if the new head is updated
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, newHeadRoot [32]byte, proposingSlot primitives.Slot) (bool, error) {
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuConfig) error {
_, span := trace.StartSpan(ctx, "beacon-chain.blockchain.forkchoiceUpdateWithExecution")
defer span.End()
// Note: Use the service context here to avoid the parent context being ended during a forkchoice update.
ctx = trace.NewContext(s.ctx, span)

isNewHead := s.isNewHead(newHeadRoot)
if !isNewHead {
return false, nil
}

headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return false, nil
}

_, tracked := s.trackedProposer(headState, proposingSlot)
_, tracked := s.trackedProposer(args.headState, args.proposingSlot)
if (tracked || features.Get().PrepareAllPayloads) && !features.Get().DisableReorgLateBlocks {
if s.shouldOverrideFCU(newHeadRoot, proposingSlot) {
return false, nil
if s.shouldOverrideFCU(args.headRoot, args.proposingSlot) {
return nil
}
}

_, err = s.notifyForkchoiceUpdate(ctx, &notifyForkchoiceUpdateArg{
headState: headState,
headRoot: newHeadRoot,
headBlock: headBlock.Block(),
_, err := s.notifyForkchoiceUpdate(ctx, &notifyForkchoiceUpdateArg{
headState: args.headState,
headRoot: args.headRoot,
headBlock: args.headBlock.Block(),
})
if err != nil {
return false, errors.Wrap(err, "could not notify forkchoice update")
return errors.Wrap(err, "could not notify forkchoice update")
}

if err := s.saveHead(ctx, newHeadRoot, headBlock, headState); err != nil {
if err := s.saveHead(ctx, args.headRoot, args.headBlock, args.headState); err != nil {
log.WithError(err).Error("could not save head")
}

// Only need to prune attestations from pool if the head has changed.
if err := s.pruneAttsFromPool(headBlock); err != nil {
if err := s.pruneAttsFromPool(args.headBlock); err != nil {
log.WithError(err).Error("could not prune attestations from pool")
}
return true, nil
return nil
}

// shouldOverrideFCU checks whether the incoming block is still subject to being
Expand Down
55 changes: 16 additions & 39 deletions beacon-chain/blockchain/forkchoice_update_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,14 @@ func TestService_getHeadStateAndBlock(t *testing.T) {
}

func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
opts := testServiceOptsWithDB(t)

service, err := NewService(ctx, opts...)
require.NoError(t, err)
service.cfg.PayloadIDCache = cache.NewPayloadIDCache()
_, err = service.forkchoiceUpdateWithExecution(ctx, service.headRoot(), service.CurrentSlot()+1)
require.NoError(t, err)
hookErr := "could not notify forkchoice update"
invalidStateErr := "could not get state summary: could not find block in DB"
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
gb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, service.saveInitSyncBlock(ctx, [32]byte{'a'}, gb))
_, err = service.forkchoiceUpdateWithExecution(ctx, [32]byte{'a'}, service.CurrentSlot()+1)
require.NoError(t, err)
require.LogsContain(t, hook, invalidStateErr)
service.cfg.TrackedValidatorsCache = cache.NewTrackedValidatorsCache()

hook.Reset()
service.head = &head{
root: [32]byte{'a'},
block: nil, /* should not panic if notify head uses correct head */
}

// Block in Cache
b := util.NewBeaconBlock()
b.Block.Slot = 2
wsb, err := blocks.NewSignedBeaconBlock(b)
Expand All @@ -99,12 +80,6 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
state: st,
}
service.cfg.PayloadIDCache.Set(2, [32]byte{2}, [8]byte{1})
_, err = service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot())
require.NoError(t, err)
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)

// Block in DB
b = util.NewBeaconBlock()
b.Block.Slot = 3
util.SaveBlock(t, ctx, service.cfg.BeaconDB, b)
Expand All @@ -117,19 +92,17 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
state: st,
}
service.cfg.PayloadIDCache.Set(2, [32]byte{2}, [8]byte{1})
_, err = service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()+1)
require.NoError(t, err)
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
args := &fcuConfig{
headState: st,
headRoot: r1,
headBlock: wsb,
proposingSlot: service.CurrentSlot() + 1,
}
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args))

payloadID, has := service.cfg.PayloadIDCache.PayloadID(2, [32]byte{2})
require.Equal(t, true, has)
require.Equal(t, primitives.PayloadID{1}, payloadID)

// Test zero headRoot returns immediately.
headRoot := service.headRoot()
_, err = service.forkchoiceUpdateWithExecution(ctx, [32]byte{}, service.CurrentSlot()+1)
require.NoError(t, err)
require.Equal(t, service.headRoot(), headRoot)
}

func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testing.T) {
Expand Down Expand Up @@ -173,9 +146,13 @@ func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testin
service.head.block = sb
service.head.state = st
service.cfg.PayloadIDCache.Set(service.CurrentSlot()+1, [32]byte{} /* root */, [8]byte{})
_, err = service.forkchoiceUpdateWithExecution(ctx, r, service.CurrentSlot()+1)
require.NoError(t, err)

args := &fcuConfig{
headState: st,
headBlock: sb,
headRoot: r,
proposingSlot: service.CurrentSlot() + 1,
}
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args))
}

func TestShouldOverrideFCU(t *testing.T) {
Expand Down
42 changes: 31 additions & 11 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn
if err != nil {
log.WithError(err).Warn("Could not update head")
}
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
if blockRoot != headRoot {
receivedWeight, err := s.cfg.ForkChoiceStore.Weight(blockRoot)
if err != nil {
Expand All @@ -88,10 +89,25 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn
"headRoot": fmt.Sprintf("%#x", headRoot),
"headWeight": headWeight,
}).Debug("Head block is not the received block")
}
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))

if headRoot == blockRoot {
if s.isNewHead(headRoot) {
headState, headBlock, err := s.getStateAndBlock(ctx, headRoot)
if err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return nil
}
// verify conditions for FCU, notifies FCU, and saves the new head.
// This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools.
args := &fcuConfig{
headState: headState,
headBlock: headBlock,
headRoot: headRoot,
proposingSlot: s.CurrentSlot() + 1,
}
if err := s.forkchoiceUpdateWithExecution(ctx, args); err != nil {
return err
}
}
} else {
// Updating next slot state cache can happen in the background
// except in the epoch boundary in which case we lock to handle
// the shuffling and proposer caches updates.
Expand All @@ -114,19 +130,23 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn
}
}()
}
// verify conditions for FCU, notifies FCU, and saves the new head.
// This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools.
args := &fcuConfig{
headState: postState,
headBlock: signed,
headRoot: headRoot,
proposingSlot: s.CurrentSlot() + 1,
}
if err := s.forkchoiceUpdateWithExecution(ctx, args); err != nil {
return err
}
}
// verify conditions for FCU, notifies FCU, and saves the new head.
// This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools.
if _, err := s.forkchoiceUpdateWithExecution(ctx, headRoot, s.CurrentSlot()+1); err != nil {
return err
}

optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(blockRoot)
if err != nil {
log.WithError(err).Debug("Could not check if block is optimistic")
optimistic = true
}

// Send notification of the processed block to the state feed.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Expand Down
33 changes: 18 additions & 15 deletions beacon-chain/blockchain/receive_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,28 +130,31 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))

start = time.Now()
// return early if we haven't changed head
newHeadRoot, err := s.cfg.ForkChoiceStore.Head(ctx)
if err != nil {
log.WithError(err).Error("Could not compute head from new attestations")
// Fallback to our current head root in the event of a failure.
s.headLock.RLock()
newHeadRoot = s.headRoot()
s.headLock.RUnlock()
return
}
newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))

changed, err := s.forkchoiceUpdateWithExecution(s.ctx, newHeadRoot, proposingSlot)
if !s.isNewHead(newHeadRoot) {
return
}
headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("could not update forkchoice")
log.WithError(err).Error("could not get head block")
return
}
if changed {
s.headLock.RLock()
log.WithFields(logrus.Fields{
"oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()),
"newHeadRoot": fmt.Sprintf("%#x", newHeadRoot),
}).Debug("Head changed due to attestations")
s.headLock.RUnlock()
newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
args := &fcuConfig{
headState: headState,
headRoot: newHeadRoot,
headBlock: headBlock,
proposingSlot: proposingSlot,
}
if err := s.forkchoiceUpdateWithExecution(s.ctx, args); err != nil {
log.WithError(err).Error("could not update forkchoice")
}
log.WithField("newHeadRoot", fmt.Sprintf("%#x", newHeadRoot)).Debug("Head changed due to attestations")
}

// This processes fork choice attestations from the pool to account for validator votes and fork choice.
Expand Down

0 comments on commit 9809f5a

Please sign in to comment.