Skip to content

Commit

Permalink
Merge pull request #244 from kcalvinalvin/2025-01-03-handle-utreexopr…
Browse files Browse the repository at this point in the history
…oof-message

netsync, main: use utreexo proof message to validate blocks
  • Loading branch information
kcalvinalvin authored Jan 21, 2025
2 parents f6bb544 + d8922a8 commit 4b1efbd
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 19 deletions.
162 changes: 143 additions & 19 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync/atomic"
"time"

"github.com/utreexo/utreexo"
"github.com/utreexo/utreexod/blockchain"
"github.com/utreexo/utreexod/btcutil"
"github.com/utreexo/utreexod/chaincfg"
Expand Down Expand Up @@ -87,6 +88,13 @@ type utreexoHeaderMsg struct {
peer *peerpkg.Peer
}

// utreexoProofMsg packages a bitcoin utreexo header message and the peer it came from
// together so the block handler has access to that information.
type utreexoProofMsg struct {
proof *wire.MsgUtreexoProof
peer *peerpkg.Peer
}

// notFoundMsg packages a bitcoin notfound message and the peer it came from
// together so the block handler has access to that information.
type notFoundMsg struct {
Expand Down Expand Up @@ -169,6 +177,7 @@ type peerSyncState struct {
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
requestedUtreexoHeaders map[chainhash.Hash]struct{}
requestedUtreexoProofs map[chainhash.Hash]struct{}
}

// limitAdd is a helper function for maps that require a maximum limit by
Expand Down Expand Up @@ -219,11 +228,14 @@ type SyncManager struct {
headersBuildMode bool

// The following fields are used for headers-first mode.
headersFirstMode bool
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader
headersFirstMode bool
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader
numLeaves map[int32]uint64
queuedBlocks map[chainhash.Hash]*blockMsg
queuedUtreexoProofs map[chainhash.Hash]*utreexoProofMsg

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
Expand Down Expand Up @@ -586,6 +598,7 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
requestedUtreexoHeaders: make(map[chainhash.Hash]struct{}),
requestedUtreexoProofs: make(map[chainhash.Hash]struct{}),
}

// Start syncing by choosing the best candidate if needed.
Expand Down Expand Up @@ -874,12 +887,43 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {

// Check if we've received the utreexo headers already.
if sm.chain.IsUtreexoViewActive() {
best := sm.chain.BestSnapshot()
if !best.Hash.IsEqual(&bmsg.block.MsgBlock().Header.PrevBlock) {
log.Warnf("got block %v out of order", bmsg.block.Hash())
sm.queuedBlocks[*blockHash] = bmsg
return
}

utreexoHeader, found := sm.utreexoHeaders[*bmsg.block.Hash()]
if !found {
log.Warnf("got block %v but don't have the associated "+
"utreexo header", bmsg.block.Hash())
sm.queuedBlocks[*blockHash] = bmsg
return
}

// We need the utreexo proof to be able to verify the block.
utreexoProofMsg, found := sm.queuedUtreexoProofs[*bmsg.block.Hash()]
if !found {
log.Warnf("got block %v but don't have the associated "+
"utreexo proof", bmsg.block.Hash())
sm.queuedBlocks[*blockHash] = bmsg
return
}

// We have all the data necessary to validate the block now so
// it's safee to remove this utreexo proof from the queue.
delete(sm.queuedUtreexoProofs, *bmsg.block.Hash())

udata := wire.UData{
AccProof: utreexo.Proof{
Targets: utreexoHeader.Targets,
Proof: utreexoProofMsg.proof.ProofHashes,
},
LeafDatas: utreexoProofMsg.proof.LeafDatas,
}

bmsg.block.MsgBlock().UData = &udata
bmsg.block.MsgBlock().UData.AccProof.Targets = utreexoHeader.Targets
}

Expand Down Expand Up @@ -1164,6 +1208,14 @@ func (sm *SyncManager) fetchHeaderBlocks(peer *peerpkg.Peer) {
continue
}

// If we're a csn then keep track of the numleaves. We need this to construct
// the get proof message.
if sm.chain.IsUtreexoViewActive() {
header := sm.utreexoHeaders[*node.hash]
numLeaves := sm.numLeaves[node.height-1] + uint64(header.NumAdds)
sm.numLeaves[node.height] = numLeaves
}

iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
haveInv, err := sm.haveInventory(iv)
if err != nil {
Expand Down Expand Up @@ -1196,6 +1248,22 @@ func (sm *SyncManager) fetchHeaderBlocks(peer *peerpkg.Peer) {

gdmsg.AddInvVect(iv)
numRequested++

// Immediately queue the utreexo proof for this block if we're a
// utreexo node.
if sm.chain.IsUtreexoViewActive() {
utreexoHeader, found := sm.utreexoHeaders[*node.hash]
if !found {
log.Warnf("Missing utreexo header for %v", node.hash)
return
}
syncPeerState.requestedUtreexoProofs[*node.hash] = struct{}{}

msg := wire.ConstructGetProofMsg(
node.hash, sm.numLeaves[node.height-1], utreexoHeader.Targets)
reqPeer.QueueMessage(msg, nil)
}

}
sm.startHeader = e.Next()
if numRequested >= wire.MaxInvPerMsg {
Expand Down Expand Up @@ -1615,6 +1683,33 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) {
sm.fetchHeaderBlocks(peer)
}

// handleUtreexoProofMsg queues the utreexo proof and if we already have the block,
// it'll send that block to be processed by the block handler.
func (sm *SyncManager) handleUtreexoProofMsg(hmsg *utreexoProofMsg) {
peer := hmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received utreexo proof message from unknown peer %s", peer)
return
}

blockHash := hmsg.proof.BlockHash
if _, exists = state.requestedUtreexoProofs[blockHash]; !exists {
log.Warnf("Got unrequested utreexo proof %v from %s -- "+
"disconnecting", blockHash, peer.Addr())
peer.Disconnect()
return
}

sm.queuedUtreexoProofs[blockHash] = hmsg

bmsg, haveBlock := sm.queuedBlocks[blockHash]
if haveBlock {
bmsg.reply = make(chan struct{}, 1)
sm.msgChan <- bmsg
}
}

// handleNotFoundMsg handles notfound messages from all peers.
func (sm *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
peer := nfmsg.peer
Expand Down Expand Up @@ -2100,6 +2195,9 @@ out:
case *utreexoHeaderMsg:
sm.handleUtreexoHeaderMsg(msg)

case *utreexoProofMsg:
sm.handleUtreexoProofMsg(msg)

case *notFoundMsg:
sm.handleNotFoundMsg(msg)

Expand Down Expand Up @@ -2342,6 +2440,17 @@ func (sm *SyncManager) QueueUtreexoHeader(header *wire.MsgUtreexoHeader, peer *p
sm.msgChan <- &utreexoHeaderMsg{header: header, peer: peer}
}

// QueueUtreexoProof adds the utreexo proof to the block handling queue.
func (sm *SyncManager) QueueUtreexoProof(proof *wire.MsgUtreexoProof, peer *peerpkg.Peer) {
// No channel handling here because peers do not need to block on
// headers messages.
if atomic.LoadInt32(&sm.shutdown) != 0 {
return
}

sm.msgChan <- &utreexoProofMsg{proof: proof, peer: peer}
}

// QueueNotFound adds the passed notfound message and peer to the block handling
// queue.
func (sm *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) {
Expand Down Expand Up @@ -2429,20 +2538,23 @@ func (sm *SyncManager) Pause() chan<- struct{} {
// block, tx, and inv updates.
func New(config *Config) (*SyncManager, error) {
sm := SyncManager{
peerNotifier: config.PeerNotifier,
chain: config.Chain,
txMemPool: config.TxMemPool,
chainParams: config.ChainParams,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
peerNotifier: config.PeerNotifier,
chain: config.Chain,
txMemPool: config.TxMemPool,
chainParams: config.ChainParams,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
numLeaves: make(map[int32]uint64),
utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader),
queuedBlocks: make(map[chainhash.Hash]*blockMsg),
queuedUtreexoProofs: make(map[chainhash.Hash]*utreexoProofMsg),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
}

best := sm.chain.BestSnapshot()
Expand All @@ -2460,6 +2572,18 @@ func New(config *Config) (*SyncManager, error) {
sm.headersBuildMode = true
}

// The utreexo header contains the number of added leaves in the block. This
// number added with the numleaves from the previous block gets us the numleaves
// for the current block. Since the numLeaves map is empty on startup, we need
// to put the numleaves for the best state here.
if sm.chain.IsUtreexoViewActive() {
utreexoView, err := sm.chain.FetchUtreexoViewpoint(&best.Hash)
if err != nil {
return nil, err
}
sm.numLeaves[best.Height] = utreexoView.NumLeaves()
}

sm.chain.Subscribe(sm.handleBlockchainNotification)

return &sm, nil
Expand Down
8 changes: 8 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ type MessageListeners struct {
// OnUtreexoHeader is invoked when a peer receives a headers bitcoin message.
OnUtreexoHeader func(p *Peer, msg *wire.MsgUtreexoHeader)

// OnUtreexoProof is invoked when a peer receives a utreexo proof bitcoin message.
OnUtreexoProof func(p *Peer, msg *wire.MsgUtreexoProof)

// OnGetUtreexoProof is invoked when a peer receives a utreexo proof bitcoin message.
OnGetUtreexoProof func(p *Peer, msg *wire.MsgGetUtreexoProof)

Expand Down Expand Up @@ -1483,6 +1486,11 @@ out:
p.cfg.Listeners.OnUtreexoHeader(p, msg)
}

case *wire.MsgUtreexoProof:
if p.cfg.Listeners.OnUtreexoProof != nil {
p.cfg.Listeners.OnUtreexoProof(p, msg)
}

case *wire.MsgGetUtreexoProof:
if p.cfg.Listeners.OnGetUtreexoProof != nil {
p.cfg.Listeners.OnGetUtreexoProof(p, msg)
Expand Down
6 changes: 6 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,11 @@ func (sp *serverPeer) OnGetUtreexoProof(_ *peer.Peer, msg *wire.MsgGetUtreexoPro
sp.QueueMessage(&utreexoProof, nil)
}

// OnUtreexoProof is invoked when a peer receives a utreexoproof bitcoin message.
func (sp *serverPeer) OnUtreexoProof(_ *peer.Peer, msg *wire.MsgUtreexoProof) {
sp.server.syncManager.QueueUtreexoProof(msg, sp.Peer)
}

// enforceNodeBloomFlag disconnects the peer if the server is not configured to
// allow bloom filters. Additionally, if the peer has negotiated to a protocol
// version that is high enough to observe the bloom filter service support bit,
Expand Down Expand Up @@ -2614,6 +2619,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnInv: sp.OnInv,
OnHeaders: sp.OnHeaders,
OnUtreexoHeader: sp.OnUtreexoHeader,
OnUtreexoProof: sp.OnUtreexoProof,
OnGetUtreexoProof: sp.OnGetUtreexoProof,
OnGetData: sp.OnGetData,
OnGetBlocks: sp.OnGetBlocks,
Expand Down

0 comments on commit 4b1efbd

Please sign in to comment.