From 72111832932ccceb310477c428da1f0b248085c5 Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Wed, 24 Jul 2019 15:41:41 -0400 Subject: [PATCH 1/9] Update CValidationInterface::BlockDisconnect to take a CBlockIndex Added CBlockIndex parameter points to the block being disconnected. Pass block height in Chain::BlockConnected/Chain::BlockDisconnected. --- src/interfaces/chain.cpp | 6 +++--- src/interfaces/chain.h | 4 ++-- src/test/validation_block_tests.cpp | 3 ++- src/validation.cpp | 2 +- src/validationinterface.cpp | 11 ++++++----- src/validationinterface.h | 4 ++-- src/wallet/wallet.cpp | 6 ++++-- src/wallet/wallet.h | 4 ++-- src/zmq/zmqnotificationinterface.cpp | 2 +- src/zmq/zmqnotificationinterface.h | 2 +- 10 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/interfaces/chain.cpp b/src/interfaces/chain.cpp index 22e4aaedafc51..423e8538904f1 100644 --- a/src/interfaces/chain.cpp +++ b/src/interfaces/chain.cpp @@ -188,11 +188,11 @@ class NotificationsHandlerImpl : public Handler, CValidationInterface const CBlockIndex* index, const std::vector& tx_conflicted) override { - m_notifications->BlockConnected(*block, tx_conflicted); + m_notifications->BlockConnected(*block, tx_conflicted, index->nHeight); } - void BlockDisconnected(const std::shared_ptr& block) override + void BlockDisconnected(const std::shared_ptr& block, const CBlockIndex* index) override { - m_notifications->BlockDisconnected(*block); + m_notifications->BlockDisconnected(*block, index->nHeight); } void UpdatedBlockTip(const CBlockIndex* index, const CBlockIndex* fork_index, bool is_ibd) override { diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index e675defd47375..5e230c156f03e 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -231,8 +231,8 @@ class Chain virtual ~Notifications() {} virtual void TransactionAddedToMempool(const CTransactionRef& tx) {} virtual void TransactionRemovedFromMempool(const CTransactionRef& ptx) {} - virtual void BlockConnected(const CBlock& block, const std::vector& tx_conflicted) {} - virtual void BlockDisconnected(const CBlock& block) {} + virtual void BlockConnected(const CBlock& block, const std::vector& tx_conflicted, int height) {} + virtual void BlockDisconnected(const CBlock& block, int height) {} virtual void UpdatedBlockTip() {} virtual void ChainStateFlushed(const CBlockLocator& locator) {} }; diff --git a/src/test/validation_block_tests.cpp b/src/test/validation_block_tests.cpp index b3368d44b6187..a71de71f64881 100644 --- a/src/test/validation_block_tests.cpp +++ b/src/test/validation_block_tests.cpp @@ -44,9 +44,10 @@ struct TestSubscriber : public CValidationInterface { m_expected_tip = block->GetHash(); } - void BlockDisconnected(const std::shared_ptr& block) override + void BlockDisconnected(const std::shared_ptr& block, const CBlockIndex* pindex) override { BOOST_CHECK_EQUAL(m_expected_tip, block->GetHash()); + BOOST_CHECK_EQUAL(m_expected_tip, pindex->GetBlockHash()); m_expected_tip = block->hashPrevBlock; } diff --git a/src/validation.cpp b/src/validation.cpp index b4677df62f179..05c1db44c56eb 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -2221,7 +2221,7 @@ bool CChainState::DisconnectTip(CValidationState& state, const CChainParams& cha UpdateTip(pindexDelete->pprev, chainparams); // Let wallets know transactions went from 1-confirmed to // 0-confirmed or conflicted: - GetMainSignals().BlockDisconnected(pblock); + GetMainSignals().BlockDisconnected(pblock, pindexDelete); return true; } diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 59a620ab954af..0dbedc3617fcb 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -31,7 +31,7 @@ struct MainSignalsInstance { boost::signals2::signal UpdatedBlockTip; boost::signals2::signal TransactionAddedToMempool; boost::signals2::signal &, const CBlockIndex *pindex, const std::vector&)> BlockConnected; - boost::signals2::signal &)> BlockDisconnected; + boost::signals2::signal&, const CBlockIndex* pindex)> BlockDisconnected; boost::signals2::signal TransactionRemovedFromMempool; boost::signals2::signal ChainStateFlushed; boost::signals2::signal BlockChecked; @@ -94,7 +94,7 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) { conns.UpdatedBlockTip = g_signals.m_internals->UpdatedBlockTip.connect(std::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); conns.TransactionAddedToMempool = g_signals.m_internals->TransactionAddedToMempool.connect(std::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, std::placeholders::_1)); conns.BlockConnected = g_signals.m_internals->BlockConnected.connect(std::bind(&CValidationInterface::BlockConnected, pwalletIn, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); - conns.BlockDisconnected = g_signals.m_internals->BlockDisconnected.connect(std::bind(&CValidationInterface::BlockDisconnected, pwalletIn, std::placeholders::_1)); + conns.BlockDisconnected = g_signals.m_internals->BlockDisconnected.connect(std::bind(&CValidationInterface::BlockDisconnected, pwalletIn, std::placeholders::_1, std::placeholders::_2)); conns.TransactionRemovedFromMempool = g_signals.m_internals->TransactionRemovedFromMempool.connect(std::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, std::placeholders::_1)); conns.ChainStateFlushed = g_signals.m_internals->ChainStateFlushed.connect(std::bind(&CValidationInterface::ChainStateFlushed, pwalletIn, std::placeholders::_1)); conns.BlockChecked = g_signals.m_internals->BlockChecked.connect(std::bind(&CValidationInterface::BlockChecked, pwalletIn, std::placeholders::_1, std::placeholders::_2)); @@ -158,9 +158,10 @@ void CMainSignals::BlockConnected(const std::shared_ptr &pblock, c }); } -void CMainSignals::BlockDisconnected(const std::shared_ptr &pblock) { - m_internals->m_schedulerClient.AddToProcessQueue([pblock, this] { - m_internals->BlockDisconnected(pblock); +void CMainSignals::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) +{ + m_internals->m_schedulerClient.AddToProcessQueue([pblock, pindex, this] { + m_internals->BlockDisconnected(pblock, pindex); }); } diff --git a/src/validationinterface.h b/src/validationinterface.h index 3ce617b82711e..0f59e4eddce02 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -114,7 +114,7 @@ class CValidationInterface { * * Called on a background thread. */ - virtual void BlockDisconnected(const std::shared_ptr &block) {} + virtual void BlockDisconnected(const std::shared_ptr& block, const CBlockIndex* pindex) {} /** * Notifies listeners of the new active block chain on-disk. * @@ -178,7 +178,7 @@ class CMainSignals { void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void TransactionAddedToMempool(const CTransactionRef &); void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex, const std::shared_ptr> &); - void BlockDisconnected(const std::shared_ptr &); + void BlockDisconnected(const std::shared_ptr&, const CBlockIndex* pindex); void ChainStateFlushed(const CBlockLocator &); void BlockChecked(const CBlock&, const CValidationState&); void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr&); diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index ef5f0a61e1d08..1dd1dd5711a6e 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1412,7 +1412,8 @@ void CWallet::TransactionRemovedFromMempool(const CTransactionRef &ptx) { } } -void CWallet::BlockConnected(const CBlock& block, const std::vector& vtxConflicted) { +void CWallet::BlockConnected(const CBlock& block, const std::vector& vtxConflicted, int height) +{ const uint256& block_hash = block.GetHash(); auto locked_chain = chain().lock(); LOCK(cs_wallet); @@ -1436,7 +1437,8 @@ void CWallet::BlockConnected(const CBlock& block, const std::vector& vtxConflicted) override; - void BlockDisconnected(const CBlock& block) override; + void BlockConnected(const CBlock& block, const std::vector& vtxConflicted, int height) override; + void BlockDisconnected(const CBlock& block, int height) override; void UpdatedBlockTip() override; int64_t RescanFromTime(int64_t startTime, const WalletRescanReserver& reserver, bool update); diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index de59b71b8f26f..1442aadb45743 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -186,7 +186,7 @@ void CZMQNotificationInterface::BlockConnected(const std::shared_ptr& pblock) +void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) { for (const CTransactionRef& ptx : pblock->vtx) { // Do a normal notify for each transaction removed in block disconnection diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index a0cc26a162b8f..13cc22f969dea 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -29,7 +29,7 @@ class CZMQNotificationInterface final : public CValidationInterface // CValidationInterface void TransactionAddedToMempool(const CTransactionRef& tx) override; void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected, const std::vector& vtxConflicted) override; - void BlockDisconnected(const std::shared_ptr& pblock) override; + void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; private: From 533a293af8543393af6a0f48949819e2336057c4 Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Thu, 8 Aug 2019 21:56:52 -0400 Subject: [PATCH 2/9] Add a UndoReadFromDisk without block index pointer Moving indexes as clients of the Chain interface means we should avoid to leak node internals to them if in the future they move further in their own memory spaces --- src/rpc/blockchain.cpp | 2 +- src/test/blockfilter_index_tests.cpp | 2 +- src/validation.cpp | 9 ++++----- src/validation.h | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index b7dcd59c6dd1d..33fdef32179c5 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -808,7 +808,7 @@ static CBlockUndo GetUndoChecked(const CBlockIndex* pblockindex) throw JSONRPCError(RPC_MISC_ERROR, "Undo data not available (pruned data)"); } - if (!UndoReadFromDisk(blockUndo, pblockindex)) { + if (!UndoReadFromDisk(blockUndo, pblockindex->GetUndoPos(), pblockindex->pprev->GetBlockHash())) { throw JSONRPCError(RPC_MISC_ERROR, "Can't read undo data from disk"); } diff --git a/src/test/blockfilter_index_tests.cpp b/src/test/blockfilter_index_tests.cpp index cf87aa9303b32..bfd3ad578a4d1 100644 --- a/src/test/blockfilter_index_tests.cpp +++ b/src/test/blockfilter_index_tests.cpp @@ -26,7 +26,7 @@ static bool ComputeFilter(BlockFilterType filter_type, const CBlockIndex* block_ } CBlockUndo block_undo; - if (block_index->nHeight > 0 && !UndoReadFromDisk(block_undo, block_index)) { + if (block_index->nHeight > 0 && !UndoReadFromDisk(block_undo, block_index->GetUndoPos(), pindex->prev->GetBlockHash())) { return false; } diff --git a/src/validation.cpp b/src/validation.cpp index 05c1db44c56eb..0932fcaa8f102 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1372,9 +1372,8 @@ static bool UndoWriteToDisk(const CBlockUndo& blockundo, FlatFilePos& pos, const return true; } -bool UndoReadFromDisk(CBlockUndo& blockundo, const CBlockIndex* pindex) +bool UndoReadFromDisk(CBlockUndo& blockundo, FlatFilePos pos, uint256 prev_hash) { - FlatFilePos pos = pindex->GetUndoPos(); if (pos.IsNull()) { return error("%s: no undo data available", __func__); } @@ -1388,7 +1387,7 @@ bool UndoReadFromDisk(CBlockUndo& blockundo, const CBlockIndex* pindex) uint256 hashChecksum; CHashVerifier verifier(&filein); // We need a CHashVerifier as reserializing may lose data try { - verifier << pindex->pprev->GetBlockHash(); + verifier << prev_hash; verifier >> blockundo; filein >> hashChecksum; } @@ -1464,7 +1463,7 @@ DisconnectResult CChainState::DisconnectBlock(const CBlock& block, const CBlockI bool fClean = true; CBlockUndo blockUndo; - if (!UndoReadFromDisk(blockUndo, pindex)) { + if (!UndoReadFromDisk(blockUndo, pindex->GetUndoPos(), pindex->pprev->GetBlockHash())) { error("DisconnectBlock(): failure reading undo data"); return DISCONNECT_FAILED; } @@ -3937,7 +3936,7 @@ bool CVerifyDB::VerifyDB(const CChainParams& chainparams, CCoinsView *coinsview, if (nCheckLevel >= 2 && pindex) { CBlockUndo undo; if (!pindex->GetUndoPos().IsNull()) { - if (!UndoReadFromDisk(undo, pindex)) { + if (!UndoReadFromDisk(undo, pindex->GetUndoPos(), pindex->pprev->GetBlockHash())) { return error("VerifyDB(): *** found bad undo data at %d, hash=%s\n", pindex->nHeight, pindex->GetBlockHash().ToString()); } } diff --git a/src/validation.h b/src/validation.h index d747fdbf27c6c..63eab7b95f831 100644 --- a/src/validation.h +++ b/src/validation.h @@ -373,7 +373,7 @@ bool ReadBlockFromDisk(CBlock& block, const CBlockIndex* pindex, const Consensus bool ReadRawBlockFromDisk(std::vector& block, const FlatFilePos& pos, const CMessageHeader::MessageStartChars& message_start); bool ReadRawBlockFromDisk(std::vector& block, const CBlockIndex* pindex, const CMessageHeader::MessageStartChars& message_start); -bool UndoReadFromDisk(CBlockUndo& blockundo, const CBlockIndex* pindex); +bool UndoReadFromDisk(CBlockUndo& blockundo, FlatFilePos pos, uint256 prev_hash); /** Functions for validating blocks and updating the block tree */ From 0a3416ec16bfc087d5af5e93144719900f8bd4d0 Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Sun, 18 Aug 2019 21:52:40 -0400 Subject: [PATCH 3/9] Extend BlockConnected in Chain::Notifications to pass block height and undo file position Beyond processing block height/hash and its transactions, Chain client may need to know block position or filters in its database. --- src/index/blockfilterindex.cpp | 2 +- src/interfaces/chain.cpp | 2 +- src/interfaces/chain.h | 3 ++- src/wallet/wallet.cpp | 3 ++- src/wallet/wallet.h | 2 +- 5 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/index/blockfilterindex.cpp b/src/index/blockfilterindex.cpp index c3ce8d7af0d3d..979649fc6e2b4 100644 --- a/src/index/blockfilterindex.cpp +++ b/src/index/blockfilterindex.cpp @@ -218,7 +218,7 @@ bool BlockFilterIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex uint256 prev_header; if (pindex->nHeight > 0) { - if (!UndoReadFromDisk(block_undo, pindex)) { + if (!UndoReadFromDisk(block_undo, pindex->GetUndoPos(), pindex->GetBlockHash())) { return false; } diff --git a/src/interfaces/chain.cpp b/src/interfaces/chain.cpp index 423e8538904f1..c2933b4bd5a44 100644 --- a/src/interfaces/chain.cpp +++ b/src/interfaces/chain.cpp @@ -188,7 +188,7 @@ class NotificationsHandlerImpl : public Handler, CValidationInterface const CBlockIndex* index, const std::vector& tx_conflicted) override { - m_notifications->BlockConnected(*block, tx_conflicted, index->nHeight); + m_notifications->BlockConnected(*block, tx_conflicted, index->nHeight, index->GetUndoPos()); } void BlockDisconnected(const std::shared_ptr& block, const CBlockIndex* index) override { diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index 5e230c156f03e..743029d06aacc 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -5,6 +5,7 @@ #ifndef BITCOIN_INTERFACES_CHAIN_H #define BITCOIN_INTERFACES_CHAIN_H +#include // For FlatFilePos #include // For Optional and nullopt #include // For CTransactionRef @@ -231,7 +232,7 @@ class Chain virtual ~Notifications() {} virtual void TransactionAddedToMempool(const CTransactionRef& tx) {} virtual void TransactionRemovedFromMempool(const CTransactionRef& ptx) {} - virtual void BlockConnected(const CBlock& block, const std::vector& tx_conflicted, int height) {} + virtual void BlockConnected(const CBlock& block, const std::vector& tx_conflicted, int height, FlatFilePos undo_pos) {} virtual void BlockDisconnected(const CBlock& block, int height) {} virtual void UpdatedBlockTip() {} virtual void ChainStateFlushed(const CBlockLocator& locator) {} diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 1dd1dd5711a6e..720fd00ab379f 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1412,7 +1412,8 @@ void CWallet::TransactionRemovedFromMempool(const CTransactionRef &ptx) { } } -void CWallet::BlockConnected(const CBlock& block, const std::vector& vtxConflicted, int height) +void CWallet::BlockConnected(const CBlock& block, const std::vector& vtxConflicted, + int height, FlatFilePos undo_pos) { const uint256& block_hash = block.GetHash(); auto locked_chain = chain().lock(); diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index 9d042a772b441..bf3326a84acd3 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -1069,7 +1069,7 @@ class CWallet final : public FillableSigningProvider, private interfaces::Chain: bool AddToWallet(const CWalletTx& wtxIn, bool fFlushOnClose=true); void LoadToWallet(const CWalletTx& wtxIn) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); void TransactionAddedToMempool(const CTransactionRef& tx) override; - void BlockConnected(const CBlock& block, const std::vector& vtxConflicted, int height) override; + void BlockConnected(const CBlock& block, const std::vector& vtxConflicted, int height, FlatFilePos undo_pos) override; void BlockDisconnected(const CBlock& block, int height) override; void UpdatedBlockTip() override; int64_t RescanFromTime(int64_t startTime, const WalletRescanReserver& reserver, bool update); From e545d6e21f74d1af422ab0dcde6c544725f3ddd4 Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Sun, 18 Aug 2019 21:55:07 -0400 Subject: [PATCH 4/9] Add a Rewind call in Chain::Notifications Instead of calling block disconnected multiple times and passing a block with irrelevant data we instead send common ancestor height between forked tip and new one. Thanks to this ancestor height Chain client can rollback its interior state and then process to sync forward until new tip. --- src/interfaces/chain.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index 743029d06aacc..4217927ab8609 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -236,6 +236,7 @@ class Chain virtual void BlockDisconnected(const CBlock& block, int height) {} virtual void UpdatedBlockTip() {} virtual void ChainStateFlushed(const CBlockLocator& locator) {} + virtual void Rewind(int forked_height, int ancestor_height) {} }; //! Register handler for notifications. From 5d42cf27d1b83801f6d1508bf73c4bd49bc0e9b0 Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Sun, 18 Aug 2019 22:37:33 -0400 Subject: [PATCH 5/9] Add a HandleNotifications call in Chain::Notifications Chain clients subscribing rescan requests to the Chain interface get their demand processed in ThreadServiceRequests. When tip is reached, thread call HandleNotifications to switch them to the Chain notifications handler, itself an adaptator between CValidationInterface and Chain clients. --- src/interfaces/chain.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index 4217927ab8609..d2ad9d4067786 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -237,6 +237,7 @@ class Chain virtual void UpdatedBlockTip() {} virtual void ChainStateFlushed(const CBlockLocator& locator) {} virtual void Rewind(int forked_height, int ancestor_height) {} + virtual void HandleNotifications() {} }; //! Register handler for notifications. From 5b947f84b53cc41099e1bf927d4a93cfdbfc67e0 Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Thu, 8 Aug 2019 14:00:23 -0400 Subject: [PATCH 6/9] Add Rescan class Rescan is moving the rescan logic on the node side. A single thread (ThreadServiceRequests) read all received requests from clients and service them in order during initialization. This commit does not change behavior. It just a class that doesn't do anything yet. --- src/Makefile.am | 2 + src/node/rescan.cpp | 108 ++++++++++++++++++++++++++++++++++++++++++++ src/node/rescan.h | 48 ++++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 src/node/rescan.cpp create mode 100644 src/node/rescan.h diff --git a/src/Makefile.am b/src/Makefile.am index ef5b1900d9d03..85ddaf4bc06e4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -156,6 +156,7 @@ BITCOIN_CORE_H = \ netmessagemaker.h \ node/coin.h \ node/psbt.h \ + node/rescan.h \ node/transaction.h \ noui.h \ optional.h \ @@ -277,6 +278,7 @@ libbitcoin_server_a_SOURCES = \ net_processing.cpp \ node/coin.cpp \ node/psbt.cpp \ + node/rescan.cpp \ node/transaction.cpp \ noui.cpp \ policy/fees.cpp \ diff --git a/src/node/rescan.cpp b/src/node/rescan.cpp new file mode 100644 index 0000000000000..68428cf9506ff --- /dev/null +++ b/src/node/rescan.cpp @@ -0,0 +1,108 @@ +// Copyright (c) 2019 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include +#include +#include +#include + +void Rescan::ThreadServiceRequests() +{ + auto& consensus_params = Params().GetConsensus(); + while (!m_interrupt) { + // Loop for the earliest start position among requests. + const CBlockIndex* min_start = nullptr; + const CBlockIndex* ancestor = nullptr; + for (auto& request : m_request_start) { + // If locator has not been found, request is handled + // since genesis. + if (!request.second) { + LOCK(cs_main); + request.second = ChainActive().Genesis(); + if (!request.second) continue; + } + // Find fork between chain tip and client + // local view, if there is one, ask client to rewind + // its state until common ancestor + { + LOCK(cs_main); + ancestor = ChainActive().FindFork(request.second); + } + if (ancestor && ancestor->nHeight != (request.second)->nHeight) { + (request.first)->Rewind((request.second)->nHeight, ancestor->nHeight); + request.second = ancestor; + } + if (!min_start || min_start->nHeight > (request.second)->nHeight) { + min_start = request.second; + } + } + if (min_start) { + // Read next block and send notifications. + const CBlockIndex* next = nullptr; + { + LOCK(cs_main); + if (min_start->nHeight > 0) { + assert(min_start->pprev); + next = ChainActive().Next(min_start->pprev); + } else { + next = min_start; + } + } + if (next) { + CBlock block; + ReadBlockFromDisk(block, next, consensus_params); + for (auto& request : m_request_start) { + (request.first)->BlockConnected(block, {}, next->nHeight, next->GetUndoPos()); + request.second = next; + // To avoid any race condition where callback would miss block connection, + // compare against tip and register validation interface in one sequence + LOCK(cs_main); + CBlockIndex* tip = ChainActive().Tip(); + CBlockIndex* pindex = LookupBlockIndex(block.GetHash()); + if (tip->GetBlockHash() == pindex->GetBlockHash()) { + (request.first)->UpdatedBlockTip(); // If client need to flush its state we signal tip is reached + (request.first)->HandleNotifications(); + m_request_start.erase(request.first); + } + } + } + } + } + // Be nice, let's requesters who need it, commit their database before to leave + for (auto& request : m_request_start) { + if (!request.second) continue; + LOCK(cs_main); + CBlockLocator locator = ::ChainActive().GetLocator(request.second); + (request.first)->ChainStateFlushed(locator); + } +} + +void Rescan::AddRequest(interfaces::Chain::Notifications& callback, const CBlockLocator& locator) +{ + LOCK(cs_main); + // If fork is superior at MAX_LOCATOR_SZ, rescan is going to be processed + // from genesis. If reorg of this size happens, rescan performance hit + // isn't the main problem. + m_request_start[&callback] = FindForkInGlobalIndex(::ChainActive(), locator); +} + +void Rescan::StartServiceRequests() +{ + threadServiceRequests = std::thread(&TraceThread>, "rescan", std::bind(&Rescan::ThreadServiceRequests, this)); +} + +void Rescan::InterruptServiceRequests() +{ + m_interrupt(); +} + +void Rescan::StopServiceRequests() +{ + if (threadServiceRequests.joinable()) { + threadServiceRequests.join(); + } +} diff --git a/src/node/rescan.h b/src/node/rescan.h new file mode 100644 index 0000000000000..d0b0f8c1c264c --- /dev/null +++ b/src/node/rescan.h @@ -0,0 +1,48 @@ +// Copyright (c) 2017-2019 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_NODE_RESCAN_H +#define BITCOIN_NODE_RESCAN_H + +#include +#include + +class CBlockIndex; + +//! Wallets or indexes can be fallen-behind chain tip at restart. They need +//! to learn about blocks connected during their shutdown to update their +//! internal states accordingly. If we have many wallets or indexes, each +//! of them is going to read the same scan range repeatedly out of order +//! instead of just once in order. To avoid that, Rescan accept +//! rescan request at initialization and fulfill all of them in a thread +//! ThreadServiceRequests started/stopped at init/shutdown sequences. +class Rescan +{ +private: + std::map m_request_start; + //TODO: Future plan is to use a thread pool with multiple workers to process + // rescan requests in parallel beyond clients loading + std::thread threadServiceRequests; + CThreadInterrupt m_interrupt; + + //! Read blocks in sequence, consolidating rescan requests and send notifications in sequence + //! to all requesters. + void ThreadServiceRequests(); + +public: + //! Add rescan request, using the passed callback handler to reditect block to, starting from + //! locator. + void AddRequest(interfaces::Chain::Notifications& callback, const CBlockLocator& locator); + + //! Start thread worker to replay blocks for registered requester. + void StartServiceRequests(); + + //! Interrupt thread worker replaying blocks. + void InterruptServiceRequests(); + + //! Stop thread worker replaying blocks. + void StopServiceRequests(); +}; + +#endif // BITCOIN_NODE_RESCAN_H From 27d302631adc4845f7ade78410805110f438b7f5 Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Thu, 8 Aug 2019 14:12:03 -0400 Subject: [PATCH 7/9] Extend Chain interface to register/start/stop notifications Chain interface callers may register for notifications while initializing themselves. Init code starts notification delivery, shutdown code is responsible to stop it. Notification delivery runs in its own thread (ThreadServiceRequests) during rescan period then rely on CValidationInterface. --- src/interfaces/chain.cpp | 19 +++++++++++++++++++ src/interfaces/chain.h | 13 +++++++++++++ 2 files changed, 32 insertions(+) diff --git a/src/interfaces/chain.cpp b/src/interfaces/chain.cpp index c2933b4bd5a44..3dd0d86164831 100644 --- a/src/interfaces/chain.cpp +++ b/src/interfaces/chain.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -244,6 +245,8 @@ class RpcHandlerImpl : public Handler class ChainImpl : public Chain { +private: + Rescan m_rescan; public: std::unique_ptr lock(bool try_lock) override { @@ -350,6 +353,22 @@ class ChainImpl : public Chain { return MakeUnique(*this, notifications); } + void startNotifications() override + { + m_rescan.StartServiceRequests(); + } + void interruptNotifications() override + { + m_rescan.InterruptServiceRequests(); + } + void stopNotifications() override + { + m_rescan.StopServiceRequests(); + } + void registerNotifications(Notifications& callback, const CBlockLocator& locator) override + { + m_rescan.AddRequest(callback, locator); + } void waitForNotificationsIfNewBlocksConnected(const uint256& old_tip) override { if (!old_tip.IsNull()) { diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index d2ad9d4067786..9d9eb72974731 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -21,6 +21,7 @@ class CRPCCommand; class CScheduler; class CValidationState; class Coin; +class Rescan; class uint256; enum class RBFTransactionState; struct CBlockLocator; @@ -243,6 +244,18 @@ class Chain //! Register handler for notifications. virtual std::unique_ptr handleNotifications(Notifications& notifications) = 0; + //! Start notifications sequence in order for every ChainClient, used once by init code. + virtual void startNotifications() = 0; + + //! Stop notifications sequence, used once by shutdown code. + virtual void stopNotifications() = 0; + + //! Interrupt notification sequence, used once by interrupt code. + virtual void interruptNotifications() = 0; + + //! Request notifications. + virtual void registerNotifications(interfaces::Chain::Notifications& callback, const CBlockLocator& locator) = 0; + //! Wait for pending notifications to be processed unless block hash points to the current //! chain tip, or to a possible descendant of the current chain tip that isn't currently //! connected. From 50845ec618aa88dcd8eafa24bc4ad61e49f8e40b Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Thu, 8 Aug 2019 15:44:53 -0400 Subject: [PATCH 8/9] Manage starting/interrupt/stop of ThreadServiceRequests at init/shutdown --- src/bitcoind.cpp | 8 ++++---- src/init.cpp | 12 +++++++++--- src/init.h | 2 +- src/interfaces/node.cpp | 2 +- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/bitcoind.cpp b/src/bitcoind.cpp index 8e31f6e32b5e9..c44c3d3cb18e0 100644 --- a/src/bitcoind.cpp +++ b/src/bitcoind.cpp @@ -43,13 +43,13 @@ const std::function G_TRANSLATION_FUN = nullptr; * Use the buttons Namespaces, Classes or Files at the top of the page to start navigating the code. */ -static void WaitForShutdown() +static void WaitForShutdown(InitInterfaces& interfaces) { while (!ShutdownRequested()) { MilliSleep(200); } - Interrupt(); + Interrupt(interfaces); } ////////////////////////////////////////////////////////////////////////////// @@ -172,9 +172,9 @@ static bool AppInit(int argc, char* argv[]) if (!fRet) { - Interrupt(); + Interrupt(interfaces); } else { - WaitForShutdown(); + WaitForShutdown(interfaces); } Shutdown(interfaces); diff --git a/src/init.cpp b/src/init.cpp index b84c7dc93d138..8eacec1726084 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -155,7 +155,7 @@ static std::unique_ptr globalVerifyHandle; static boost::thread_group threadGroup; static CScheduler scheduler; -void Interrupt() +void Interrupt(InitInterfaces& interfaces) { InterruptHTTPServer(); InterruptHTTPRPC(); @@ -169,6 +169,7 @@ void Interrupt() g_txindex->Interrupt(); } ForEachBlockFilterIndex([](BlockFilterIndex& index) { index.Interrupt(); }); + interfaces.chain->interruptNotifications(); } void Shutdown(InitInterfaces& interfaces) @@ -201,6 +202,7 @@ void Shutdown(InitInterfaces& interfaces) if (g_connman) g_connman->Stop(); if (g_txindex) g_txindex->Stop(); ForEachBlockFilterIndex([](BlockFilterIndex& index) { index.Stop(); }); + interfaces.chain->stopNotifications(); StopTorControl(); @@ -1727,7 +1729,11 @@ bool AppInitMain(InitInterfaces& interfaces) return false; } - // ********************************************************* Step 12: start node + // ********************************************************* Step 12: start servicing rescan requests + + interfaces.chain->startNotifications(); + + // ********************************************************* Step 13: start node int chain_active_height; @@ -1807,7 +1813,7 @@ bool AppInitMain(InitInterfaces& interfaces) return false; } - // ********************************************************* Step 13: finished + // ********************************************************* Step 14: finished SetRPCWarmupFinished(); uiInterface.InitMessage(_("Done loading").translated); diff --git a/src/init.h b/src/init.h index 1c59ca069edf2..01e1a24e397fb 100644 --- a/src/init.h +++ b/src/init.h @@ -28,7 +28,7 @@ class thread_group; } // namespace boost /** Interrupt threads */ -void Interrupt(); +void Interrupt(InitInterfaces& interfaces); void Shutdown(InitInterfaces& interfaces); //!Initialize the logging infrastructure void InitLogging(); diff --git a/src/interfaces/node.cpp b/src/interfaces/node.cpp index fd2fb6531b50d..34369899d41c5 100644 --- a/src/interfaces/node.cpp +++ b/src/interfaces/node.cpp @@ -78,7 +78,7 @@ class NodeImpl : public Node bool appInitMain() override { return AppInitMain(m_interfaces); } void appShutdown() override { - Interrupt(); + Interrupt(m_interfaces); Shutdown(m_interfaces); } void startShutdown() override { StartShutdown(); } From 1ff4272b8271f671a50497665d4a59b9e0b125de Mon Sep 17 00:00:00 2001 From: Antoine Riard Date: Mon, 19 Aug 2019 14:45:19 -0400 Subject: [PATCH 9/9] Move BaseIndex to a Chain::Notifications client model Instead of being direct consumers of the CValidationInterface indexes are moved as clients of Chain interface. Main benefit is to rescan in one sequence for all chain clients at initialization. Another benefit is to separate index from accessing node internals like direct chain query. This commit moves in this direction doesn't make separation complete. To do so, we adapt BaseIndex and its subclasses to Chain::Notifications model, remove CValidationInterface parts from BaseIndex, remove thread sync. --- src/index/base.cpp | 275 ++++++++------------------------- src/index/base.h | 60 +++---- src/index/blockfilterindex.cpp | 38 +++-- src/index/blockfilterindex.h | 12 +- src/index/txindex.cpp | 11 +- src/index/txindex.h | 9 +- src/init.cpp | 14 +- 7 files changed, 131 insertions(+), 288 deletions(-) diff --git a/src/index/base.cpp b/src/index/base.cpp index bcc8e2ce7c68c..eabee0d7f4a29 100644 --- a/src/index/base.cpp +++ b/src/index/base.cpp @@ -46,12 +46,6 @@ void BaseIndex::DB::WriteBestBlock(CDBBatch& batch, const CBlockLocator& locator batch.Write(DB_BEST_BLOCK, locator); } -BaseIndex::~BaseIndex() -{ - Interrupt(); - Stop(); -} - bool BaseIndex::Init() { CBlockLocator locator; @@ -59,103 +53,15 @@ bool BaseIndex::Init() locator.SetNull(); } - LOCK(cs_main); - if (locator.IsNull()) { - m_best_block_index = nullptr; - } else { - m_best_block_index = FindForkInGlobalIndex(::ChainActive(), locator); + if (!locator.IsNull()) { + LOCK(cs_main); + CBlockIndex* pindex = FindForkInGlobalIndex(::ChainActive(), locator); + m_last_block_processed_height = pindex->nHeight; } - m_synced = m_best_block_index.load() == ::ChainActive().Tip(); + m_chain->registerNotifications(*this, const_cast(locator)); return true; } -static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - AssertLockHeld(cs_main); - - if (!pindex_prev) { - return ::ChainActive().Genesis(); - } - - const CBlockIndex* pindex = ::ChainActive().Next(pindex_prev); - if (pindex) { - return pindex; - } - - return ::ChainActive().Next(::ChainActive().FindFork(pindex_prev)); -} - -void BaseIndex::ThreadSync() -{ - const CBlockIndex* pindex = m_best_block_index.load(); - if (!m_synced) { - auto& consensus_params = Params().GetConsensus(); - - int64_t last_log_time = 0; - int64_t last_locator_write_time = 0; - while (true) { - if (m_interrupt) { - m_best_block_index = pindex; - // No need to handle errors in Commit. If it fails, the error will be already be - // logged. The best way to recover is to continue, as index cannot be corrupted by - // a missed commit to disk for an advanced index state. - Commit(); - return; - } - - { - LOCK(cs_main); - const CBlockIndex* pindex_next = NextSyncBlock(pindex); - if (!pindex_next) { - m_best_block_index = pindex; - m_synced = true; - // No need to handle errors in Commit. See rationale above. - Commit(); - break; - } - if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) { - FatalError("%s: Failed to rewind index %s to a previous chain tip", - __func__, GetName()); - return; - } - pindex = pindex_next; - } - - int64_t current_time = GetTime(); - if (last_log_time + SYNC_LOG_INTERVAL < current_time) { - LogPrintf("Syncing %s with block chain from height %d\n", - GetName(), pindex->nHeight); - last_log_time = current_time; - } - - if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) { - m_best_block_index = pindex; - last_locator_write_time = current_time; - // No need to handle errors in Commit. See rationale above. - Commit(); - } - - CBlock block; - if (!ReadBlockFromDisk(block, pindex, consensus_params)) { - FatalError("%s: Failed to read block %s from disk", - __func__, pindex->GetBlockHash().ToString()); - return; - } - if (!WriteBlock(block, pindex)) { - FatalError("%s: Failed to write block %s to index database", - __func__, pindex->GetBlockHash().ToString()); - return; - } - } - } - - if (pindex) { - LogPrintf("%s is enabled at height %d\n", GetName(), pindex->nHeight); - } else { - LogPrintf("%s is enabled\n", GetName()); - } -} - bool BaseIndex::Commit() { CDBBatch batch(GetDB()); @@ -168,156 +74,97 @@ bool BaseIndex::Commit() bool BaseIndex::CommitInternal(CDBBatch& batch) { LOCK(cs_main); - GetDB().WriteBestBlock(batch, ::ChainActive().GetLocator(m_best_block_index)); + const CBlockIndex *pindex = ::ChainActive()[m_last_block_processed_height]; + GetDB().WriteBestBlock(batch, ::ChainActive().GetLocator(pindex)); return true; } -bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip) +void BaseIndex::Rewind(int forked_height, int ancestor_height) { - assert(current_tip == m_best_block_index); - assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip); + assert(forked_height == m_last_block_processed_height); // In the case of a reorg, ensure persisted block locator is not stale. - m_best_block_index = new_tip; + m_last_block_processed_height = ancestor_height; if (!Commit()) { - // If commit fails, revert the best block index to avoid corruption. - m_best_block_index = current_tip; - return false; + // If commit fails, revert the best processed height to avoid corruption. + m_last_block_processed_height = forked_height; } - - return true; } -void BaseIndex::BlockConnected(const std::shared_ptr& block, const CBlockIndex* pindex, - const std::vector& txn_conflicted) +void BaseIndex::BlockConnected(const CBlock& block, const std::vector& txn_conflicted, + int height, FlatFilePos block_pos) { - if (!m_synced) { + if (m_last_block_processed_height == -1 && height != 0) { + FatalError("%s: First block connected is not the genesis block (height=%d)", + __func__, height); return; } + // In the new model, if we are relying on ThreadServiceRequests to get block connection, in case of fork, + // we are going to Rewind and restart rescan from then. If we rely on ValidationInterface (i.e we reach + // tip at least once), we should receive BlockDisconnected event. In case of reorg, we don't overwrite + // data committed in data base, so may have false elements but we don't miss right ones. - const CBlockIndex* best_block_index = m_best_block_index.load(); - if (!best_block_index) { - if (pindex->nHeight != 0) { - FatalError("%s: First block connected is not the genesis block (height=%d)", - __func__, pindex->nHeight); - return; - } - } else { - // Ensure block connects to an ancestor of the current best block. This should be the case - // most of the time, but may not be immediately after the sync thread catches up and sets - // m_synced. Consider the case where there is a reorg and the blocks on the stale branch are - // in the ValidationInterface queue backlog even after the sync thread has caught up to the - // new chain tip. In this unlikely event, log a warning and let the queue clear. - if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) { - LogPrintf("%s: WARNING: Block %s does not connect to an ancestor of " /* Continued */ - "known best chain (tip=%s); not updating index\n", - __func__, pindex->GetBlockHash().ToString(), - best_block_index->GetBlockHash().ToString()); - return; - } - if (best_block_index != pindex->pprev && !Rewind(best_block_index, pindex->pprev)) { - FatalError("%s: Failed to rewind index %s to a previous chain tip", - __func__, GetName()); - return; - } - } - - if (WriteBlock(*block, pindex)) { - m_best_block_index = pindex; + if (WriteBlock(block, height, block_pos, m_last_block_processed)) { + m_last_block_processed_height = height; + m_last_block_processed = block.GetBlockHeader().GetHash(); } else { FatalError("%s: Failed to write block %s to index", - __func__, pindex->GetBlockHash().ToString()); + __func__, block.GetBlockHeader().GetHash().ToString()); return; } + // To avoid performance hit, we flush every SYNC_LOCATOR_WRITE_INTERVAL until catch up to tip, + // then after every block connection. + if (m_synced) { + int64_t current_time = GetTime(); + if (m_last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) { + m_last_locator_write_time = current_time; + // No need to handle errors in Commit. If it fails, the error will be already be + // logged. The best way to recover is to continue, as index cannot be corrupted by + // a missed commit to disk for an advanced index state. + Commit(); + } + } else { + Commit(); + } } + void BaseIndex::ChainStateFlushed(const CBlockLocator& locator) { - if (!m_synced) { - return; - } - - const uint256& locator_tip_hash = locator.vHave.front(); - const CBlockIndex* locator_tip_index; - { - LOCK(cs_main); - locator_tip_index = LookupBlockIndex(locator_tip_hash); - } - - if (!locator_tip_index) { - FatalError("%s: First block (hash=%s) in locator was not found", - __func__, locator_tip_hash.ToString()); - return; - } - - // This checks that ChainStateFlushed callbacks are received after BlockConnected. The check may fail - // immediately after the sync thread catches up and sets m_synced. Consider the case where - // there is a reorg and the blocks on the stale branch are in the ValidationInterface queue - // backlog even after the sync thread has caught up to the new chain tip. In this unlikely - // event, log a warning and let the queue clear. - const CBlockIndex* best_block_index = m_best_block_index.load(); - if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) { - LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known best " /* Continued */ - "chain (tip=%s); not writing index locator\n", - __func__, locator_tip_hash.ToString(), - best_block_index->GetBlockHash().ToString()); - return; - } - - // No need to handle errors in Commit. If it fails, the error will be already be logged. The - // best way to recover is to continue, as index cannot be corrupted by a missed commit to disk - // for an advanced index state. + // No need to handle errors in Commit. If it fails, the error will be already be + // logged. The best way to recover is to continue, as index cannot be corrupted by + // a missed commit to disk for an advanced index state. Commit(); } -bool BaseIndex::BlockUntilSyncedToCurrentChain() +void BaseIndex::BlockDisconnected(const CBlock& block, int height) { - AssertLockNotHeld(cs_main); - - if (!m_synced) { - return false; - } - - { - // Skip the queue-draining stuff if we know we're caught up with - // ::ChainActive().Tip(). - LOCK(cs_main); - const CBlockIndex* chain_tip = ::ChainActive().Tip(); - const CBlockIndex* best_block_index = m_best_block_index.load(); - if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) { - return true; - } - } - - LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName()); - SyncWithValidationInterfaceQueue(); - return true; + Rewind(height, height - 1); + // If commit fails in Rewind, don't update last block processed to avoid corruption + if (m_last_block_processed_height != height) m_last_block_processed = block.hashPrevBlock; } -void BaseIndex::Interrupt() +void BaseIndex::UpdatedBlockTip() { - m_interrupt(); + // Starting from now, index synchronization state relies on CValidationInterface and not + // on ThreadServiceRequest anymore. + m_synced = true; } -void BaseIndex::Start() +void BaseIndex::HandleNotifications() { - // Need to register this ValidationInterface before running Init(), so that - // callbacks are not missed if Init sets m_synced to true. - RegisterValidationInterface(this); - if (!Init()) { - FatalError("%s: %s failed to initialize", __func__, GetName()); - return; - } - - m_thread_sync = std::thread(&TraceThread>, GetName(), - std::bind(&BaseIndex::ThreadSync, this)); + m_chain_notifications_handler = m_chain->handleNotifications(*this); } -void BaseIndex::Stop() +bool BaseIndex::BlockUntilSyncedToCurrentChain() { - UnregisterValidationInterface(this); + AssertLockNotHeld(cs_main); - if (m_thread_sync.joinable()) { - m_thread_sync.join(); + // If index wasn't synced at least once until tip, it's not receiving yet updates + // from validation interface, no need to hold. + if (!m_synced) { + return false; } + m_chain->waitForNotificationsIfNewBlocksConnected(m_last_block_processed); + return true; } diff --git a/src/index/base.h b/src/index/base.h index 31acbed0c186a..ce47243965c48 100644 --- a/src/index/base.h +++ b/src/index/base.h @@ -6,6 +6,8 @@ #define BITCOIN_INDEX_BASE_H #include +#include +#include #include #include #include @@ -19,7 +21,7 @@ class CBlockIndex; * CValidationInterface and ensures blocks are indexed sequentially according * to their position in the active chain. */ -class BaseIndex : public CValidationInterface +class BaseIndex : private interfaces::Chain::Notifications { protected: class DB : public CDBWrapper @@ -41,18 +43,20 @@ class BaseIndex : public CValidationInterface /// ValidationInterface notifications to stay in sync. std::atomic m_synced{false}; - /// The last block in the chain that the index is in sync with. - std::atomic m_best_block_index{nullptr}; + /** Height of last block processed */ + int m_last_block_processed_height = -1; - std::thread m_thread_sync; - CThreadInterrupt m_interrupt; + /** Hash of last block processed */ + uint256 m_last_block_processed; - /// Sync the index with the block index starting from the current best block. - /// Intended to be run in its own thread, m_thread_sync, and can be - /// interrupted with m_interrupt. Once the index gets in sync, the m_synced - /// flag is set and the BlockConnected ValidationInterface callback takes - /// over and the sync thread exits. - void ThreadSync(); + /** Last time we write locator on disk */ + int64_t m_last_locator_write_time = 0; + + /** Interface for accessing chain state. */ + interfaces::Chain* m_chain; + + /** Registered interfaces::Chain::Notifications handler. */ + std::unique_ptr m_chain_notifications_handler; /// Write the current index state (eg. chain block locator and subclass-specific items) to disk. /// @@ -64,34 +68,39 @@ class BaseIndex : public CValidationInterface /// getting corrupted. bool Commit(); -protected: - void BlockConnected(const std::shared_ptr& block, const CBlockIndex* pindex, - const std::vector& txn_conflicted) override; + + void BlockConnected(const CBlock& block, const std::vector& txn_conflicted, int height, FlatFilePos block_pos) override; + + void BlockDisconnected(const CBlock& block, int height) override; + + void UpdatedBlockTip() override; void ChainStateFlushed(const CBlockLocator& locator) override; + void HandleNotifications() override; + +protected: + + explicit BaseIndex(interfaces::Chain& chain) : m_chain(&chain) {} + + void Rewind(int forked_height, int ancestor_height); /// Initialize internal state from the database and block index. virtual bool Init(); /// Write update index entries for a newly connected block. virtual bool WriteBlock(const CBlock& block, const CBlockIndex* pindex) { return true; } - + /// Write update index entries for a newly connected block. + virtual bool WriteBlock(const CBlock& block, int height, const FlatFilePos undo_pos, uint256& prev_block) { return true; } /// Virtual method called internally by Commit that can be overridden to atomically /// commit more index state. virtual bool CommitInternal(CDBBatch& batch); - /// Rewind index to an earlier chain tip during a chain reorg. The tip must - /// be an ancestor of the current best block. - virtual bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip); - virtual DB& GetDB() const = 0; /// Get the name of the index for display in logs. virtual const char* GetName() const = 0; public: - /// Destructor interrupts sync thread if running and blocks until it exits. - virtual ~BaseIndex(); /// Blocks the current thread until the index is caught up to the current /// state of the block chain. This only blocks if the index has gotten in @@ -99,15 +108,6 @@ class BaseIndex : public CValidationInterface /// queue. If the index is catching up from far behind, this method does /// not block and immediately returns false. bool BlockUntilSyncedToCurrentChain(); - - void Interrupt(); - - /// Start initializes the sync state and registers the instance as a - /// ValidationInterface so that it stays in sync with blockchain updates. - void Start(); - - /// Stops the instance from staying in sync with blockchain updates. - void Stop(); }; #endif // BITCOIN_INDEX_BASE_H diff --git a/src/index/blockfilterindex.cpp b/src/index/blockfilterindex.cpp index 979649fc6e2b4..3faf76ed74328 100644 --- a/src/index/blockfilterindex.cpp +++ b/src/index/blockfilterindex.cpp @@ -96,9 +96,10 @@ struct DBHashKey { static std::map g_filter_indexes; -BlockFilterIndex::BlockFilterIndex(BlockFilterType filter_type, +BlockFilterIndex::BlockFilterIndex(BlockFilterType filter_type, interfaces::Chain& chain, size_t n_cache_size, bool f_memory, bool f_wipe) - : m_filter_type(filter_type) + : BaseIndex(chain), + m_filter_type(filter_type) { const std::string& filter_name = BlockFilterTypeName(filter_type); if (filter_name.empty()) throw std::invalid_argument("unknown filter_type"); @@ -212,25 +213,24 @@ size_t BlockFilterIndex::WriteFilterToDisk(FlatFilePos& pos, const BlockFilter& return data_size; } -bool BlockFilterIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex) +bool BlockFilterIndex::WriteBlock(const CBlock& block, int height, FlatFilePos undo_pos, uint256& prev_hash) { CBlockUndo block_undo; uint256 prev_header; - if (pindex->nHeight > 0) { - if (!UndoReadFromDisk(block_undo, pindex->GetUndoPos(), pindex->GetBlockHash())) { + if (height > 0) { + if (!UndoReadFromDisk(block_undo, undo_pos, prev_hash)) { return false; } std::pair read_out; - if (!m_db->Read(DBHeightKey(pindex->nHeight - 1), read_out)) { + if (!m_db->Read(DBHeightKey(height - 1), read_out)) { return false; } - uint256 expected_block_hash = pindex->pprev->GetBlockHash(); - if (read_out.first != expected_block_hash) { + if (read_out.first != prev_hash) { return error("%s: previous block header belongs to unexpected block %s; expected %s", - __func__, read_out.first.ToString(), expected_block_hash.ToString()); + __func__, read_out.first.ToString(), prev_hash.ToString()); } prev_header = read_out.second.header; @@ -242,12 +242,12 @@ bool BlockFilterIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex if (bytes_written == 0) return false; std::pair value; - value.first = pindex->GetBlockHash(); + value.first = block.GetBlockHeader().GetHash(); value.second.hash = filter.GetHash(); value.second.header = filter.ComputeHeader(prev_header); value.second.pos = m_next_filter_pos; - if (!m_db->Write(DBHeightKey(pindex->nHeight), value)) { + if (!m_db->Write(DBHeightKey(height), value)) { return false; } @@ -281,27 +281,25 @@ static bool CopyHeightIndexToHashIndex(CDBIterator& db_it, CDBBatch& batch, return true; } -bool BlockFilterIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip) +void BlockFilterIndex::Rewind(int forked_height, int ancestor_height) { - assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip); - CDBBatch batch(*m_db); std::unique_ptr db_it(m_db->NewIterator()); // During a reorg, we need to copy all filters for blocks that are getting disconnected from the // height index to the hash index so we can still find them when the height index entries are // overwritten. - if (!CopyHeightIndexToHashIndex(*db_it, batch, m_name, new_tip->nHeight, current_tip->nHeight)) { - return false; + if (!CopyHeightIndexToHashIndex(*db_it, batch, m_name, ancestor_height, forked_height)) { + return ; } // The latest filter position gets written in Commit by the call to the BaseIndex::Rewind. // But since this creates new references to the filter, the position should get updated here // atomically as well in case Commit fails. batch.Write(DB_FILTER_POS, m_next_filter_pos); - if (!m_db->WriteBatch(batch)) return false; + if (!m_db->WriteBatch(batch)) return ; - return BaseIndex::Rewind(current_tip, new_tip); + return BaseIndex::Rewind(forked_height, ancestor_height); } static bool LookupOne(const CDBWrapper& db, const CBlockIndex* block_index, DBVal& result) @@ -446,12 +444,12 @@ void ForEachBlockFilterIndex(std::function fn) for (auto& entry : g_filter_indexes) fn(entry.second); } -bool InitBlockFilterIndex(BlockFilterType filter_type, +bool InitBlockFilterIndex(BlockFilterType filter_type, interfaces::Chain& chain, size_t n_cache_size, bool f_memory, bool f_wipe) { auto result = g_filter_indexes.emplace(std::piecewise_construct, std::forward_as_tuple(filter_type), - std::forward_as_tuple(filter_type, + std::forward_as_tuple(filter_type, chain, n_cache_size, f_memory, f_wipe)); return result.second; } diff --git a/src/index/blockfilterindex.h b/src/index/blockfilterindex.h index 436d52515f490..21534a1460b53 100644 --- a/src/index/blockfilterindex.h +++ b/src/index/blockfilterindex.h @@ -31,21 +31,23 @@ class BlockFilterIndex final : public BaseIndex size_t WriteFilterToDisk(FlatFilePos& pos, const BlockFilter& filter); protected: - bool Init() override; bool CommitInternal(CDBBatch& batch) override; - bool WriteBlock(const CBlock& block, const CBlockIndex* pindex) override; + bool WriteBlock(const CBlock& block, int height, FlatFilePos undo_pos, uint256& prev_hash) override; - bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip) override; + void Rewind(int forked_height, int ancestor_height) override; BaseIndex::DB& GetDB() const override { return *m_db; } const char* GetName() const override { return m_name.c_str(); } public: + /// Initialize internal state from the database and block index. + bool Init() override; + /** Constructs the index, which becomes available to be queried. */ - explicit BlockFilterIndex(BlockFilterType filter_type, + explicit BlockFilterIndex(BlockFilterType filter_type, interfaces::Chain& chain, size_t n_cache_size, bool f_memory = false, bool f_wipe = false); BlockFilterType GetFilterType() const { return m_filter_type; } @@ -78,7 +80,7 @@ void ForEachBlockFilterIndex(std::function fn); * Initialize a block filter index for the given type if one does not already exist. Returns true if * a new index is created and false if one has already been initialized. */ -bool InitBlockFilterIndex(BlockFilterType filter_type, +bool InitBlockFilterIndex(BlockFilterType filter_type, interfaces::Chain& chain, size_t n_cache_size, bool f_memory = false, bool f_wipe = false); /** diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 62db38f8943df..81f11fd60d775 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -224,8 +224,9 @@ bool TxIndex::DB::MigrateData(CBlockTreeDB& block_tree_db, const CBlockLocator& return true; } -TxIndex::TxIndex(size_t n_cache_size, bool f_memory, bool f_wipe) - : m_db(MakeUnique(n_cache_size, f_memory, f_wipe)) +TxIndex::TxIndex(interfaces::Chain& chain, size_t n_cache_size, bool f_memory, bool f_wipe) + : BaseIndex(chain), + m_db(MakeUnique(n_cache_size, f_memory, f_wipe)) {} TxIndex::~TxIndex() {} @@ -244,12 +245,12 @@ bool TxIndex::Init() return BaseIndex::Init(); } -bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex) +bool TxIndex::WriteBlock(const CBlock& block, int height, FlatFilePos block_pos, uint256& prev_hash) { // Exclude genesis block transaction because outputs are not spendable. - if (pindex->nHeight == 0) return true; + if (height == 0) return true; - CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size())); + CDiskTxPos pos(block_pos, GetSizeOfCompactSize(block.vtx.size())); std::vector> vPos; vPos.reserve(block.vtx.size()); for (const auto& tx : block.vtx) { diff --git a/src/index/txindex.h b/src/index/txindex.h index 8202c3c951f12..62b6fb88f5e41 100644 --- a/src/index/txindex.h +++ b/src/index/txindex.h @@ -23,18 +23,19 @@ class TxIndex final : public BaseIndex const std::unique_ptr m_db; protected: - /// Override base class init to migrate from old database. - bool Init() override; - bool WriteBlock(const CBlock& block, const CBlockIndex* pindex) override; + bool WriteBlock(const CBlock& block, int height, FlatFilePos undo_pos, uint256& prev_hash) override; BaseIndex::DB& GetDB() const override; const char* GetName() const override { return "txindex"; } public: + /// Override base class init to migrate from old database. + bool Init() override; + /// Constructs the index, which becomes available to be queried. - explicit TxIndex(size_t n_cache_size, bool f_memory = false, bool f_wipe = false); + explicit TxIndex(interfaces::Chain& chain, size_t n_cache_size, bool f_memory = false, bool f_wipe = false); // Destructor is declared because this class contains a unique_ptr to an incomplete type. virtual ~TxIndex() override; diff --git a/src/init.cpp b/src/init.cpp index 8eacec1726084..486e723430c92 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -165,10 +165,6 @@ void Interrupt(InitInterfaces& interfaces) InterruptMapPort(); if (g_connman) g_connman->Interrupt(); - if (g_txindex) { - g_txindex->Interrupt(); - } - ForEachBlockFilterIndex([](BlockFilterIndex& index) { index.Interrupt(); }); interfaces.chain->interruptNotifications(); } @@ -200,8 +196,6 @@ void Shutdown(InitInterfaces& interfaces) // using the other before destroying them. if (peerLogic) UnregisterValidationInterface(peerLogic.get()); if (g_connman) g_connman->Stop(); - if (g_txindex) g_txindex->Stop(); - ForEachBlockFilterIndex([](BlockFilterIndex& index) { index.Stop(); }); interfaces.chain->stopNotifications(); StopTorControl(); @@ -1643,13 +1637,13 @@ bool AppInitMain(InitInterfaces& interfaces) // ********************************************************* Step 8: start indexers if (gArgs.GetBoolArg("-txindex", DEFAULT_TXINDEX)) { - g_txindex = MakeUnique(nTxIndexCache, false, fReindex); - g_txindex->Start(); + g_txindex = MakeUnique(*interfaces.chain, nTxIndexCache, false, fReindex); + g_txindex->Init(); } for (const auto& filter_type : g_enabled_filter_types) { - InitBlockFilterIndex(filter_type, filter_index_cache, false, fReindex); - GetBlockFilterIndex(filter_type)->Start(); + InitBlockFilterIndex(filter_type, *interfaces.chain, filter_index_cache, false, fReindex); + GetBlockFilterIndex(filter_type)->Init(); } // ********************************************************* Step 9: load wallet