From 9809f5ac778d5ec83c92510b606144d2ef9017b8 Mon Sep 17 00:00:00 2001 From: Potuz Date: Sat, 30 Dec 2023 09:20:20 -0300 Subject: [PATCH] Simplify fcu 1 (#13387) * Remove unsafe proposer indices cache * Simplify FCU #1 This PR starts the process of gradually simplifying FCU It removes the responsibility of getting the state and block from this function and informing if head has changed. It is only called when the imported block has actually become head. * Add a call to FCU in edge cases --- .../blockchain/forkchoice_update_execution.go | 44 +++++++-------- .../forkchoice_update_execution_test.go | 55 ++++++------------- beacon-chain/blockchain/process_block.go | 42 ++++++++++---- .../blockchain/receive_attestation.go | 33 ++++++----- 4 files changed, 84 insertions(+), 90 deletions(-) diff --git a/beacon-chain/blockchain/forkchoice_update_execution.go b/beacon-chain/blockchain/forkchoice_update_execution.go index 1efbf94a7612..eb2c9bb4da00 100644 --- a/beacon-chain/blockchain/forkchoice_update_execution.go +++ b/beacon-chain/blockchain/forkchoice_update_execution.go @@ -44,50 +44,44 @@ func (s *Service) getStateAndBlock(ctx context.Context, r [32]byte) (state.Beaco return headState, newHeadBlock, nil } +type fcuConfig struct { + headState state.BeaconState + headBlock interfaces.ReadOnlySignedBeaconBlock + headRoot [32]byte + proposingSlot primitives.Slot +} + // fockchoiceUpdateWithExecution is a wrapper around notifyForkchoiceUpdate. It decides whether a new call to FCU should be made. -// it returns true if the new head is updated -func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, newHeadRoot [32]byte, proposingSlot primitives.Slot) (bool, error) { +func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuConfig) error { _, span := trace.StartSpan(ctx, "beacon-chain.blockchain.forkchoiceUpdateWithExecution") defer span.End() // Note: Use the service context here to avoid the parent context being ended during a forkchoice update. ctx = trace.NewContext(s.ctx, span) - - isNewHead := s.isNewHead(newHeadRoot) - if !isNewHead { - return false, nil - } - - headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot) - if err != nil { - log.WithError(err).Error("Could not get forkchoice update argument") - return false, nil - } - - _, tracked := s.trackedProposer(headState, proposingSlot) + _, tracked := s.trackedProposer(args.headState, args.proposingSlot) if (tracked || features.Get().PrepareAllPayloads) && !features.Get().DisableReorgLateBlocks { - if s.shouldOverrideFCU(newHeadRoot, proposingSlot) { - return false, nil + if s.shouldOverrideFCU(args.headRoot, args.proposingSlot) { + return nil } } - _, err = s.notifyForkchoiceUpdate(ctx, ¬ifyForkchoiceUpdateArg{ - headState: headState, - headRoot: newHeadRoot, - headBlock: headBlock.Block(), + _, err := s.notifyForkchoiceUpdate(ctx, ¬ifyForkchoiceUpdateArg{ + headState: args.headState, + headRoot: args.headRoot, + headBlock: args.headBlock.Block(), }) if err != nil { - return false, errors.Wrap(err, "could not notify forkchoice update") + return errors.Wrap(err, "could not notify forkchoice update") } - if err := s.saveHead(ctx, newHeadRoot, headBlock, headState); err != nil { + if err := s.saveHead(ctx, args.headRoot, args.headBlock, args.headState); err != nil { log.WithError(err).Error("could not save head") } // Only need to prune attestations from pool if the head has changed. - if err := s.pruneAttsFromPool(headBlock); err != nil { + if err := s.pruneAttsFromPool(args.headBlock); err != nil { log.WithError(err).Error("could not prune attestations from pool") } - return true, nil + return nil } // shouldOverrideFCU checks whether the incoming block is still subject to being diff --git a/beacon-chain/blockchain/forkchoice_update_execution_test.go b/beacon-chain/blockchain/forkchoice_update_execution_test.go index 36ef8a13e1b4..33273bacd66f 100644 --- a/beacon-chain/blockchain/forkchoice_update_execution_test.go +++ b/beacon-chain/blockchain/forkchoice_update_execution_test.go @@ -58,33 +58,14 @@ func TestService_getHeadStateAndBlock(t *testing.T) { } func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) { - hook := logTest.NewGlobal() ctx := context.Background() opts := testServiceOptsWithDB(t) service, err := NewService(ctx, opts...) require.NoError(t, err) service.cfg.PayloadIDCache = cache.NewPayloadIDCache() - _, err = service.forkchoiceUpdateWithExecution(ctx, service.headRoot(), service.CurrentSlot()+1) - require.NoError(t, err) - hookErr := "could not notify forkchoice update" - invalidStateErr := "could not get state summary: could not find block in DB" - require.LogsDoNotContain(t, hook, invalidStateErr) - require.LogsDoNotContain(t, hook, hookErr) - gb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock()) - require.NoError(t, err) - require.NoError(t, service.saveInitSyncBlock(ctx, [32]byte{'a'}, gb)) - _, err = service.forkchoiceUpdateWithExecution(ctx, [32]byte{'a'}, service.CurrentSlot()+1) - require.NoError(t, err) - require.LogsContain(t, hook, invalidStateErr) + service.cfg.TrackedValidatorsCache = cache.NewTrackedValidatorsCache() - hook.Reset() - service.head = &head{ - root: [32]byte{'a'}, - block: nil, /* should not panic if notify head uses correct head */ - } - - // Block in Cache b := util.NewBeaconBlock() b.Block.Slot = 2 wsb, err := blocks.NewSignedBeaconBlock(b) @@ -99,12 +80,6 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) { state: st, } service.cfg.PayloadIDCache.Set(2, [32]byte{2}, [8]byte{1}) - _, err = service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()) - require.NoError(t, err) - require.LogsDoNotContain(t, hook, invalidStateErr) - require.LogsDoNotContain(t, hook, hookErr) - - // Block in DB b = util.NewBeaconBlock() b.Block.Slot = 3 util.SaveBlock(t, ctx, service.cfg.BeaconDB, b) @@ -117,19 +92,17 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) { state: st, } service.cfg.PayloadIDCache.Set(2, [32]byte{2}, [8]byte{1}) - _, err = service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()+1) - require.NoError(t, err) - require.LogsDoNotContain(t, hook, invalidStateErr) - require.LogsDoNotContain(t, hook, hookErr) + args := &fcuConfig{ + headState: st, + headRoot: r1, + headBlock: wsb, + proposingSlot: service.CurrentSlot() + 1, + } + require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args)) + payloadID, has := service.cfg.PayloadIDCache.PayloadID(2, [32]byte{2}) require.Equal(t, true, has) require.Equal(t, primitives.PayloadID{1}, payloadID) - - // Test zero headRoot returns immediately. - headRoot := service.headRoot() - _, err = service.forkchoiceUpdateWithExecution(ctx, [32]byte{}, service.CurrentSlot()+1) - require.NoError(t, err) - require.Equal(t, service.headRoot(), headRoot) } func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testing.T) { @@ -173,9 +146,13 @@ func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testin service.head.block = sb service.head.state = st service.cfg.PayloadIDCache.Set(service.CurrentSlot()+1, [32]byte{} /* root */, [8]byte{}) - _, err = service.forkchoiceUpdateWithExecution(ctx, r, service.CurrentSlot()+1) - require.NoError(t, err) - + args := &fcuConfig{ + headState: st, + headBlock: sb, + headRoot: r, + proposingSlot: service.CurrentSlot() + 1, + } + require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args)) } func TestShouldOverrideFCU(t *testing.T) { diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 444c88fe24e2..576c33304f88 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -73,6 +73,7 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn if err != nil { log.WithError(err).Warn("Could not update head") } + newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds())) if blockRoot != headRoot { receivedWeight, err := s.cfg.ForkChoiceStore.Weight(blockRoot) if err != nil { @@ -88,10 +89,25 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn "headRoot": fmt.Sprintf("%#x", headRoot), "headWeight": headWeight, }).Debug("Head block is not the received block") - } - newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds())) - - if headRoot == blockRoot { + if s.isNewHead(headRoot) { + headState, headBlock, err := s.getStateAndBlock(ctx, headRoot) + if err != nil { + log.WithError(err).Error("Could not get forkchoice update argument") + return nil + } + // verify conditions for FCU, notifies FCU, and saves the new head. + // This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools. + args := &fcuConfig{ + headState: headState, + headBlock: headBlock, + headRoot: headRoot, + proposingSlot: s.CurrentSlot() + 1, + } + if err := s.forkchoiceUpdateWithExecution(ctx, args); err != nil { + return err + } + } + } else { // Updating next slot state cache can happen in the background // except in the epoch boundary in which case we lock to handle // the shuffling and proposer caches updates. @@ -114,19 +130,23 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn } }() } + // verify conditions for FCU, notifies FCU, and saves the new head. + // This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools. + args := &fcuConfig{ + headState: postState, + headBlock: signed, + headRoot: headRoot, + proposingSlot: s.CurrentSlot() + 1, + } + if err := s.forkchoiceUpdateWithExecution(ctx, args); err != nil { + return err + } } - // verify conditions for FCU, notifies FCU, and saves the new head. - // This function also prunes attestations, other similar operations happen in prunePostBlockOperationPools. - if _, err := s.forkchoiceUpdateWithExecution(ctx, headRoot, s.CurrentSlot()+1); err != nil { - return err - } - optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(blockRoot) if err != nil { log.WithError(err).Debug("Could not check if block is optimistic") optimistic = true } - // Send notification of the processed block to the state feed. s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ Type: statefeed.BlockProcessed, diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 71e62c963702..961ea98a8395 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -130,28 +130,31 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot) processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds())) start = time.Now() + // return early if we haven't changed head newHeadRoot, err := s.cfg.ForkChoiceStore.Head(ctx) if err != nil { log.WithError(err).Error("Could not compute head from new attestations") - // Fallback to our current head root in the event of a failure. - s.headLock.RLock() - newHeadRoot = s.headRoot() - s.headLock.RUnlock() + return } - newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds())) - - changed, err := s.forkchoiceUpdateWithExecution(s.ctx, newHeadRoot, proposingSlot) + if !s.isNewHead(newHeadRoot) { + return + } + headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot) if err != nil { - log.WithError(err).Error("could not update forkchoice") + log.WithError(err).Error("could not get head block") + return } - if changed { - s.headLock.RLock() - log.WithFields(logrus.Fields{ - "oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()), - "newHeadRoot": fmt.Sprintf("%#x", newHeadRoot), - }).Debug("Head changed due to attestations") - s.headLock.RUnlock() + newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds())) + args := &fcuConfig{ + headState: headState, + headRoot: newHeadRoot, + headBlock: headBlock, + proposingSlot: proposingSlot, + } + if err := s.forkchoiceUpdateWithExecution(s.ctx, args); err != nil { + log.WithError(err).Error("could not update forkchoice") } + log.WithField("newHeadRoot", fmt.Sprintf("%#x", newHeadRoot)).Debug("Head changed due to attestations") } // This processes fork choice attestations from the pool to account for validator votes and fork choice.