Skip to content

Commit

Permalink
1. update the rawdb package 2. remove the sync bloom 3. update the go…
Browse files Browse the repository at this point in the history
…-ethereum v1.10.18 4. update the dowmloader 5. update to the btcd/btcec/v2
  • Loading branch information
mapdev33 committed Jan 24, 2024
1 parent 467d9f6 commit 96e0bde
Show file tree
Hide file tree
Showing 39 changed files with 2,605 additions and 793 deletions.
77 changes: 37 additions & 40 deletions atlas/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"

"github.com/mapprotocol/atlas/atlas/protocols/eth"
"github.com/mapprotocol/atlas/atlas/protocols/snap"
"github.com/mapprotocol/atlas/consensus/istanbul"
Expand All @@ -55,8 +52,8 @@ var (
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
fullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
lightMaxForkAncestry uint64 = params.LightImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
fullMaxForkAncestry uint64 = params2.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
lightMaxForkAncestry uint64 = params2.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)

reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
Expand Down Expand Up @@ -103,8 +100,7 @@ type Downloader struct {
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed

stateDB ethdb.Database // Database to state sync into (and deduplicate via)
stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks
stateDB ethdb.Database // Database to state sync into (and deduplicate via)

// Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at
Expand Down Expand Up @@ -220,7 +216,7 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
Expand All @@ -240,7 +236,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
}
dl := &Downloader{
stateDB: stateDb,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
Expand All @@ -259,7 +254,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
syncStatsState: stateSyncStats{
processed: rawdb.ReadFastTrieProgress(stateDb),
//processed: rawdb.ReadFastTrieProgress(stateDb),
},
trackStateReq: make(chan *stateReq),
ibftConsensus: ibftConsensus,
Expand Down Expand Up @@ -298,8 +293,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
StartingBlock: d.syncStatsChainOrigin,
CurrentBlock: current,
HighestBlock: d.syncStatsChainHeight,
PulledStates: d.syncStatsState.processed,
KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
//PulledStates: d.syncStatsState.processed,
//KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
}
}

Expand Down Expand Up @@ -400,9 +395,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// If we are already full syncing, but have a fast-sync bloom filter laying
// around, make sure it doesn't use memory any more. This is a special case
// when the user attempts to fast sync a new empty network.
if mode == FullSync && d.stateBloom != nil {
d.stateBloom.Close()
}
//if mode == FullSync && d.stateBloom != nil {
// d.stateBloom.Close()
//}
// If snap sync was requested, create the snap scheduler and switch to fast
// sync mode. Long term we could drop fast sync or merge the two together,
// but until snap becomes prevalent, we should support both. TODO(karalabe).
Expand Down Expand Up @@ -657,9 +652,9 @@ func (d *Downloader) Terminate() {
default:
close(d.quitCh)
}
if d.stateBloom != nil {
d.stateBloom.Close()
}
//if d.stateBloom != nil {
// d.stateBloom.Close()
//}
d.quitLock.Unlock()

// Cancel any pending download requests
Expand Down Expand Up @@ -740,9 +735,11 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
// calculateRequestSpan calculates what headers to request from a peer when trying to determine the
// common ancestor.
// It returns parameters to be used for peer.RequestHeadersByNumber:
// from - starting block number
// count - number of headers to request
// skip - number of headers to skip
//
// from - starting block number
// count - number of headers to request
// skip - number of headers to skip
//
// and also returns 'max', the last block which is expected to be returned by the remote peers,
// given the (from,count,skip)
func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) {
Expand Down Expand Up @@ -1362,22 +1359,22 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
// - pending: task callback for the number of requests still needing download (detect completion/non-completability)
// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
// - fetch: network callback to actually send a particular download request to a physical remote peer
// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
// - capacity: network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log messages
// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
// - pending: task callback for the number of requests still needing download (detect completion/non-completability)
// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
// - fetch: network callback to actually send a particular download request to a physical remote peer
// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
// - capacity: network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
Expand Down Expand Up @@ -2027,9 +2024,9 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
// a rollback after committing the pivot and restarting fast sync, we don't end
// up using a nil bloom. Empty bloom is fine, it just returns that it does not
// have the info we need, so reach down to the database instead.
if d.stateBloom != nil {
d.stateBloom.Close()
}
//if d.stateBloom != nil {
// d.stateBloom.Close()
//}
return nil
}

Expand Down
9 changes: 4 additions & 5 deletions atlas/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/trie"
"golang.org/x/crypto/sha3"

"github.com/mapprotocol/atlas/core/rawdb"
"github.com/mapprotocol/atlas/core/state"
)

Expand Down Expand Up @@ -299,7 +298,7 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
return &stateSync{
d: d,
root: root,
sched: state.NewStateSync(root, d.stateDB, d.stateBloom, nil),
sched: state.NewStateSync(root, d.stateDB, nil),
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
trieTasks: make(map[common.Hash]*trieTask),
codeTasks: make(map[common.Hash]*codeTask),
Expand Down Expand Up @@ -610,7 +609,7 @@ func (s *stateSync) updateStats(written, duplicate, unexpected int, duration tim
if written > 0 || duplicate > 0 || unexpected > 0 {
log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "trieretry", len(s.trieTasks), "coderetry", len(s.codeTasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
}
if written > 0 {
rawdb.WriteFastTrieProgress(s.d.stateDB, s.d.syncStatsState.processed)
}
//if written > 0 {
// rawdb.WriteFastTrieProgress(s.d.stateDB, s.d.syncStatsState.processed)
//}
}
17 changes: 8 additions & 9 deletions atlas/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"

"github.com/mapprotocol/atlas/atlas/downloader"
"github.com/mapprotocol/atlas/atlas/fetcher"
"github.com/mapprotocol/atlas/atlas/protocols/eth"
Expand All @@ -42,6 +40,7 @@ import (
"github.com/mapprotocol/atlas/core/forkid"
"github.com/mapprotocol/atlas/core/types"
"github.com/mapprotocol/atlas/p2p"
params2 "github.com/mapprotocol/atlas/params"
)

const (
Expand Down Expand Up @@ -112,8 +111,8 @@ type handler struct {
chain *chain.BlockChain
maxPeers int

downloader *downloader.Downloader
stateBloom *trie.SyncBloom
downloader *downloader.Downloader
//stateBloom *trie.SyncBloom
blockFetcher *fetcher.BlockFetcher
txFetcher *fetcher.TxFetcher
peers *peerSet
Expand Down Expand Up @@ -191,7 +190,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
// If we have trusted checkpoints, enforce them on the chain
if config.Checkpoint != nil {
h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1
h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params2.CHTFrequency - 1
h.checkpointHash = config.Checkpoint.SectionHead
}
// Construct the downloader (long sync) and its backing state bloom if fast
Expand All @@ -201,10 +200,10 @@ func newHandler(config *handlerConfig) (*handler, error) {
// and the heal-portion of the snap sync is much lighter than fast. What we particularly
// want to avoid, is a 90%-finished (but restarted) snap-sync to begin
// indexing the entire trie
if atomic.LoadUint32(&h.fastSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 {
h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database)
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer)
//if atomic.LoadUint32(&h.fastSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 {
// h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database)
//}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down
11 changes: 5 additions & 6 deletions atlas/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/trie"

"github.com/mapprotocol/atlas/atlas/protocols/eth"
"github.com/mapprotocol/atlas/core/chain"
"github.com/mapprotocol/atlas/core/types"
Expand All @@ -37,9 +35,10 @@ import (
// packets that are sent as replies or broadcasts.
type ethHandler handler

func (h *ethHandler) Chain() *chain.BlockChain { return h.chain }
func (h *ethHandler) StateBloom() *trie.SyncBloom { return h.stateBloom }
func (h *ethHandler) TxPool() eth.TxPool { return h.txpool }
func (h *ethHandler) Chain() *chain.BlockChain { return h.chain }

// func (h *ethHandler) StateBloom() *trie.SyncBloom { return h.stateBloom }
func (h *ethHandler) TxPool() eth.TxPool { return h.txpool }

// RunPeer is invoked when a peer joins on the `eth` protocol.
func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error {
Expand Down Expand Up @@ -163,7 +162,7 @@ func (h *ethHandler) handleHeaders(peer *eth.Peer, headers []*types.Header) erro

// handleBodies is invoked from a peer's message handler when it transmits a batch
// of block bodies for the local node to process.
//func (h *ethHandler) handleBodies(peer *eth.Peer, txs [][]*types.Transaction, uncles [][]*types.Header) error {
// func (h *ethHandler) handleBodies(peer *eth.Peer, txs [][]*types.Transaction, uncles [][]*types.Header) error {
func (h *ethHandler) handleBodies(peer *eth.Peer, blockHashes []common.Hash, transactions [][]*types.Transaction, randomness []*types.Randomness, epochSnarkData []*types.EpochSnarkData) error {
// Filter out any explicitly requested bodies, deliver the rest to the downloader
filter := len(blockHashes) > 0 || len(transactions) > 0 || len(randomness) > 0 || len(epochSnarkData) > 0
Expand Down
4 changes: 1 addition & 3 deletions atlas/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/trie"

"github.com/mapprotocol/atlas/consensus"
"github.com/mapprotocol/atlas/core/chain"
"github.com/mapprotocol/atlas/core/types"
Expand Down Expand Up @@ -73,7 +71,7 @@ type Backend interface {
Chain() *chain.BlockChain

// StateBloom retrieves the bloom filter - if any - for state trie nodes.
StateBloom() *trie.SyncBloom
//StateBloom() *trie.SyncBloom

// TxPool retrieves the transaction pool object to serve data.
TxPool() TxPool
Expand Down
5 changes: 0 additions & 5 deletions atlas/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,6 @@ func answerGetNodeDataQuery(backend Backend, query GetNodeDataPacket, peer *Peer
lookups >= 2*maxNodeDataServe {
break
}
// Retrieve the requested state entry
if bloom := backend.StateBloom(); bloom != nil && !bloom.Contains(hash[:]) {
// Only lookup the trie node if there's chance that we actually have it
continue
}
entry, err := backend.Chain().TrieNode(hash)
if len(entry) == 0 || err != nil {
// Read the contract code with prefix only to save unnecessary lookups.
Expand Down
2 changes: 1 addition & 1 deletion atlas/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
s.lock.Lock()
s.root = root
s.healer = &healTask{
scheduler: state.NewStateSync(root, s.db, nil, s.onHealState),
scheduler: state.NewStateSync(root, s.db, s.onHealState),
trieTasks: make(map[common.Hash]trie.SyncPath),
codeTasks: make(map[common.Hash]struct{}),
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/dbcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func freezerInspect(ctx *cli.Context) error {
defer stack.Close()
path := filepath.Join(stack.ResolvePath("chaindata"), "ancient")
log.Info("Opening freezer", "location", path, "name", kind)
if f, err := rawdb.NewFreezerTable(path, kind, disableSnappy); err != nil {
if f, err := rawdb.NewFreezerTable(path, kind, disableSnappy, true); err != nil {
return err
} else {
f.DumpIndex(start, end)
Expand Down
6 changes: 3 additions & 3 deletions core/chain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64,
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if err := bc.db.TruncateAncients(num); err != nil {
if err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
Expand Down Expand Up @@ -1261,7 +1261,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// The tx index data could not be written.
// Roll back the ancient store update.
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
if err := bc.db.TruncateHead(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
Expand All @@ -1277,7 +1277,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, core.ErrSideChainReceipts
Expand Down
Loading

0 comments on commit 96e0bde

Please sign in to comment.