Skip to content

Commit

Permalink
remove unnecessary db writing for pending crosslinks (#3751)
Browse files Browse the repository at this point in the history
* remove unnecessary db writing for crosslinks

* Fix comment and add log

* change comment
  • Loading branch information
rlan35 authored Jun 9, 2021
1 parent 3e851a8 commit d9489bd
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
51 changes: 31 additions & 20 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,11 @@ func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
}

if err := bc.SavePendingCrossLinks(); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to save pending cross links")
}

// Unsubscribe all subscriptions registered from blockchain
bc.scope.Close()
close(bc.quit)
Expand Down Expand Up @@ -2070,26 +2075,29 @@ func (bc *BlockChain) ReadPendingSlashingCandidates() slash.Records {

// ReadPendingCrossLinks retrieves pending crosslinks
func (bc *BlockChain) ReadPendingCrossLinks() ([]types.CrossLink, error) {
cls := []types.CrossLink{}
bytes := []byte{}
if cached, ok := bc.pendingCrossLinksCache.Get(pendingCLCacheKey); ok {
bytes = cached.([]byte)
cls = cached.([]types.CrossLink)
return cls, nil
} else {
by, err := rawdb.ReadPendingCrossLinks(bc.db)
if err != nil || len(by) == 0 {
return nil, err
}
bytes = by
}
cls := []types.CrossLink{}
if err := rlp.DecodeBytes(bytes, &cls); err != nil {
utils.Logger().Error().Err(err).Msg("Invalid pending crosslink RLP decoding")
return nil, err
}

bc.pendingCrossLinksCache.Add(pendingCLCacheKey, cls)
return cls, nil
}

// WritePendingCrossLinks saves the pending crosslinks
func (bc *BlockChain) WritePendingCrossLinks(crossLinks []types.CrossLink) error {
// CachePendingCrossLinks caches the pending crosslinks in memory
func (bc *BlockChain) CachePendingCrossLinks(crossLinks []types.CrossLink) error {
// deduplicate crosslinks if any
m := map[uint32]map[uint64]types.CrossLink{}
for _, cl := range crossLinks {
Expand All @@ -2105,22 +2113,25 @@ func (bc *BlockChain) WritePendingCrossLinks(crossLinks []types.CrossLink) error
cls = append(cls, cl)
}
}
utils.Logger().Debug().Msgf("[WritePendingCrossLinks] Before Dedup has %d cls, after Dedup has %d cls", len(crossLinks), len(cls))
utils.Logger().Debug().Msgf("[CachePendingCrossLinks] Before Dedup has %d cls, after Dedup has %d cls", len(crossLinks), len(cls))

bytes, err := rlp.EncodeToBytes(cls)
if err != nil {
utils.Logger().Error().Msg("[WritePendingCrossLinks] Failed to encode pending crosslinks")
return err
}
if err := rawdb.WritePendingCrossLinks(bc.db, bytes); err != nil {
return err
}
by, err := rlp.EncodeToBytes(cls)
if err == nil {
bc.pendingCrossLinksCache.Add(pendingCLCacheKey, by)
}
bc.pendingCrossLinksCache.Add(pendingCLCacheKey, cls)
return nil
}

// SavePendingCrossLinks saves the pending crosslinks in db
func (bc *BlockChain) SavePendingCrossLinks() error {
if cached, ok := bc.pendingCrossLinksCache.Get(pendingCLCacheKey); ok {
cls := cached.([]types.CrossLink)
bytes, err := rlp.EncodeToBytes(cls)
if err != nil {
return err
}
if err := rawdb.WritePendingCrossLinks(bc.db, bytes); err != nil {
return err
}
}
return nil
}

// AddPendingSlashingCandidates appends pending slashing candidates
Expand Down Expand Up @@ -2163,11 +2174,11 @@ func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, e

cls, err := bc.ReadPendingCrossLinks()
if err != nil || len(cls) == 0 {
err := bc.WritePendingCrossLinks(pendingCLs)
err := bc.CachePendingCrossLinks(pendingCLs)
return len(pendingCLs), err
}
cls = append(cls, pendingCLs...)
err = bc.WritePendingCrossLinks(cls)
err = bc.CachePendingCrossLinks(cls)
return len(cls), err
}

Expand Down Expand Up @@ -2199,7 +2210,7 @@ func (bc *BlockChain) DeleteFromPendingCrossLinks(crossLinks []types.CrossLink)
}
pendingCLs = append(pendingCLs, cl)
}
err = bc.WritePendingCrossLinks(pendingCLs)
err = bc.CachePendingCrossLinks(pendingCLs)
return len(pendingCLs), err
}

Expand Down
4 changes: 4 additions & 0 deletions node/node_syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
}
payloadSize += len(encoded)
if payloadSize > getBlocksRequestHardCap {
utils.Logger().Warn().Err(err).
Int("req size", len(request.Hashes)).
Int("cur size", len(response.Payload)).
Msg("[SYNC] Max blocks response size reached, ignoring the rest.")
break
}
response.Payload = append(response.Payload, encoded)
Expand Down

0 comments on commit d9489bd

Please sign in to comment.