From a7c9bd99df837f0b8b07f893631dcb1436959556 Mon Sep 17 00:00:00 2001 From: arvidn Date: Thu, 7 Nov 2024 13:11:35 +0100 Subject: [PATCH] simplify pre_validate_blocks_multiprocessing() by making it pre-validated one block at a time. That's how we use it in production code anyway --- .../blockchain/blockchain_test_utils.py | 13 +- chia/_tests/blockchain/test_blockchain.py | 229 +++++++++--------- chia/_tests/core/full_node/test_full_node.py | 45 ++-- .../test_third_party_harvesters.py | 23 +- chia/consensus/multiprocess_validation.py | 181 +++++++------- chia/full_node/full_node.py | 57 ++--- chia/simulator/full_node_simulator.py | 24 +- 7 files changed, 289 insertions(+), 283 deletions(-) diff --git a/chia/_tests/blockchain/blockchain_test_utils.py b/chia/_tests/blockchain/blockchain_test_utils.py index 3104f5301528..c9b865a84ea1 100644 --- a/chia/_tests/blockchain/blockchain_test_utils.py +++ b/chia/_tests/blockchain/blockchain_test_utils.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio from typing import Optional from chia_rs import SpendBundleConditions @@ -8,7 +7,7 @@ from chia.consensus.block_body_validation import ForkInfo from chia.consensus.blockchain import AddBlockResult, Blockchain from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty -from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing +from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_block from chia.types.full_block import FullBlock from chia.types.validation_state import ValidationState from chia.util.augmented_chain import AugmentedBlockchain @@ -81,17 +80,15 @@ async def _validate_and_add_block( conds = SpendBundleConditions([], 0, 0, 0, None, None, [], 0, 0, 0, True) results = PreValidationResult(None, uint64(1), conds, uint32(0)) else: - futures = await pre_validate_blocks_multiprocessing( + future = await pre_validate_block( blockchain.constants, AugmentedBlockchain(blockchain), - [block], + block, blockchain.pool, - {}, + None, ValidationState(ssi, diff, prev_ses_block), ) - pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert pre_validation_results is not None - results = pre_validation_results[0] + results = await future if results.error is not None: if expected_result == AddBlockResult.INVALID_BLOCK and expected_error is None: # We expected an error but didn't specify which one diff --git a/chia/_tests/blockchain/test_blockchain.py b/chia/_tests/blockchain/test_blockchain.py index d0ec491ab5c4..06f7f864710b 100644 --- a/chia/_tests/blockchain/test_blockchain.py +++ b/chia/_tests/blockchain/test_blockchain.py @@ -7,7 +7,7 @@ from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import replace -from typing import Optional +from typing import Awaitable, Optional import pytest from chia_rs import AugSchemeMPL, G2Element, MerkleSet @@ -32,7 +32,7 @@ from chia.consensus.find_fork_point import lookup_fork_chain from chia.consensus.full_block_to_block_record import block_to_block_record from chia.consensus.get_block_generator import get_block_generator -from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing +from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_block from chia.consensus.pot_iterations import is_overflow_block from chia.full_node.mempool_check_conditions import get_name_puzzle_conditions from chia.simulator.block_tools import BlockTools, create_block_tools_async @@ -54,7 +54,6 @@ from chia.types.unfinished_block import UnfinishedBlock from chia.types.validation_state import ValidationState from chia.util.augmented_chain import AugmentedBlockchain -from chia.util.cpu import available_logical_cores from chia.util.errors import Err from chia.util.generator_tools import get_block_header from chia.util.hash import std_hash @@ -1792,14 +1791,20 @@ async def test_pre_validation_fails_bad_blocks(self, empty_blockchain: Blockchai block_bad = recursive_replace( blocks[-1], "reward_chain_block.total_iters", blocks[-1].reward_chain_block.total_iters + 1 ) - futures = await pre_validate_blocks_multiprocessing( - empty_blockchain.constants, - AugmentedBlockchain(empty_blockchain), - [blocks[0], block_bad], - empty_blockchain.pool, - {}, - ValidationState(ssi, difficulty, None), - ) + futures = [] + vs = ValidationState(ssi, difficulty, None) + chain = AugmentedBlockchain(empty_blockchain) + for block in [blocks[0], block_bad]: + futures.append( + await pre_validate_block( + empty_blockchain.constants, + chain, + block, + empty_blockchain.pool, + None, + vs, + ) + ) res: list[PreValidationResult] = list(await asyncio.gather(*futures)) assert res[0].error is None assert res[1].error is not None @@ -1810,46 +1815,39 @@ async def test_pre_validation( ) -> None: blocks = default_1000_blocks[:100] start = time.time() - n_at_a_time = min(available_logical_cores(), 32) - times_pv = [] - times_rb = [] ssi = empty_blockchain.constants.SUB_SLOT_ITERS_STARTING difficulty = empty_blockchain.constants.DIFFICULTY_STARTING - for i in range(0, len(blocks), n_at_a_time): - end_i = min(i + n_at_a_time, len(blocks)) - blocks_to_validate = blocks[i:end_i] - start_pv = time.time() - futures = await pre_validate_blocks_multiprocessing( - empty_blockchain.constants, - AugmentedBlockchain(empty_blockchain), - blocks_to_validate, - empty_blockchain.pool, - {}, - ValidationState(ssi, difficulty, None), - ) - res: list[PreValidationResult] = list(await asyncio.gather(*futures)) - end_pv = time.time() - times_pv.append(end_pv - start_pv) - assert res is not None - for n in range(end_i - i): - assert res[n] is not None - assert res[n].error is None - block = blocks_to_validate[n] - start_rb = time.time() - fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) - result, err, _ = await empty_blockchain.add_block(block, res[n], ssi, fork_info=fork_info) - end_rb = time.time() - times_rb.append(end_rb - start_rb) - assert err is None - assert result == AddBlockResult.NEW_PEAK - log.info( - f"Added block {block.height} total iters {block.total_iters} " - f"new slot? {len(block.finished_sub_slots)}, time {end_rb - start_rb}" + blockchain = AugmentedBlockchain(empty_blockchain) + vs = ValidationState(ssi, difficulty, None) + futures: list[Awaitable[PreValidationResult]] = [] + start = time.monotonic() + for block in blocks: + futures.append( + await pre_validate_block( + bt.constants, + blockchain, + block, + empty_blockchain.pool, + None, + vs, ) - end = time.time() + ) + + results = await asyncio.gather(*futures) + end = time.monotonic() + validation_time = start - end + db_start = end + + fork_info = ForkInfo(-1, -1, bt.constants.GENESIS_CHALLENGE) + for block, res in zip(blocks, results): + assert res.error is None + result, err, _ = await empty_blockchain.add_block(block, res, ssi, fork_info=fork_info) + assert err is None + assert result == AddBlockResult.NEW_PEAK + end = time.monotonic() log.info(f"Total time: {end - start} seconds") - log.info(f"Average pv: {sum(times_pv) / (len(blocks) / n_at_a_time)}") - log.info(f"Average rb: {sum(times_rb) / (len(blocks))}") + log.info(f"Average validation: {validation_time / len(blocks)}") + log.info(f"Average database: {(end - db_start) / (len(blocks))}") class TestBodyValidation: @@ -1926,18 +1924,18 @@ async def test_conditions( ) ssi = b.constants.SUB_SLOT_ITERS_STARTING diff = b.constants.DIFFICULTY_STARTING - futures = await pre_validate_blocks_multiprocessing( + block = blocks[-1] + future = await pre_validate_block( b.constants, AugmentedBlockchain(b), - [blocks[-1]], + block, b.pool, - {}, + None, ValidationState(ssi, diff, None), ) - pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) + pre_validation_result: PreValidationResult = await future # Ignore errors from pre-validation, we are testing block_body_validation - repl_preval_results = replace(pre_validation_results[0], error=None, required_iters=uint64(1)) - block = blocks[-1] + repl_preval_results = replace(pre_validation_result, error=None, required_iters=uint64(1)) fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) code, err, state_change = await b.add_block(block, repl_preval_results, sub_slot_iters=ssi, fork_info=fork_info) assert code == AddBlockResult.NEW_PEAK @@ -2050,19 +2048,18 @@ async def test_timelock_conditions( ) ssi = b.constants.SUB_SLOT_ITERS_STARTING diff = b.constants.DIFFICULTY_STARTING - futures = await pre_validate_blocks_multiprocessing( + block = blocks[-1] + future = await pre_validate_block( b.constants, AugmentedBlockchain(b), - [blocks[-1]], + block, b.pool, - {}, + None, ValidationState(ssi, diff, None), ) - pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert pre_validation_results is not None - block = blocks[-1] + pre_validation_result: PreValidationResult = await future fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) - assert (await b.add_block(block, pre_validation_results[0], sub_slot_iters=ssi, fork_info=fork_info))[ + assert (await b.add_block(block, pre_validation_result, sub_slot_iters=ssi, fork_info=fork_info))[ 0 ] == expected @@ -2133,18 +2130,18 @@ async def test_aggsig_garbage( ) ssi = b.constants.SUB_SLOT_ITERS_STARTING diff = b.constants.DIFFICULTY_STARTING - futures = await pre_validate_blocks_multiprocessing( + block = blocks[-1] + future = await pre_validate_block( b.constants, AugmentedBlockchain(b), - [blocks[-1]], + block, b.pool, - {}, + None, ValidationState(ssi, diff, None), ) - pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) + pre_validation_result: PreValidationResult = await future # Ignore errors from pre-validation, we are testing block_body_validation - repl_preval_results = replace(pre_validation_results[0], error=None, required_iters=uint64(1)) - block = blocks[-1] + repl_preval_results = replace(pre_validation_result, error=None, required_iters=uint64(1)) fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) res, error, state_change = await b.add_block( block, repl_preval_results, sub_slot_iters=ssi, fork_info=fork_info @@ -2261,19 +2258,18 @@ async def test_ephemeral_timelock( ) ssi = b.constants.SUB_SLOT_ITERS_STARTING diff = b.constants.DIFFICULTY_STARTING - futures = await pre_validate_blocks_multiprocessing( + block = blocks[-1] + future = await pre_validate_block( b.constants, AugmentedBlockchain(b), - [blocks[-1]], + block, b.pool, - {}, + None, ValidationState(ssi, diff, None), ) - pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert pre_validation_results is not None - block = blocks[-1] + pre_validation_result: PreValidationResult = await future fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) - assert (await b.add_block(block, pre_validation_results[0], sub_slot_iters=ssi, fork_info=fork_info))[ + assert (await b.add_block(block, pre_validation_result, sub_slot_iters=ssi, fork_info=fork_info))[ 0 ] == expected @@ -2628,17 +2624,16 @@ async def test_cost_exceeds_max( ) )[1] assert err in [Err.BLOCK_COST_EXCEEDS_MAX] - futures = await pre_validate_blocks_multiprocessing( + future = await pre_validate_block( b.constants, AugmentedBlockchain(b), - [blocks[-1]], + blocks[-1], b.pool, - {}, + None, ValidationState(ssi, diff, None), ) - results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert results is not None - assert Err(results[0].error) == Err.BLOCK_COST_EXCEEDS_MAX + result: PreValidationResult = await future + assert Err(result.error) == Err.BLOCK_COST_EXCEEDS_MAX @pytest.mark.anyio async def test_clvm_must_not_fail(self, empty_blockchain: Blockchain, bt: BlockTools) -> None: @@ -3236,17 +3231,16 @@ async def test_invalid_agg_sig(self, empty_blockchain: Blockchain, bt: BlockTool # Bad signature also fails in prevalidation ssi = b.constants.SUB_SLOT_ITERS_STARTING diff = b.constants.DIFFICULTY_STARTING - futures = await pre_validate_blocks_multiprocessing( + future = await pre_validate_block( b.constants, AugmentedBlockchain(b), - [last_block], + last_block, b.pool, - {}, + None, ValidationState(ssi, diff, None), ) - preval_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert preval_results is not None - assert preval_results[0].error == Err.BAD_AGGREGATE_SIGNATURE.value + preval_result: PreValidationResult = await future + assert preval_result.error == Err.BAD_AGGREGATE_SIGNATURE.value def maybe_header_hash(block: Optional[BlockRecord]) -> Optional[bytes32]: @@ -3355,14 +3349,20 @@ async def test_long_reorg( print(f"pre-validating {len(blocks)} blocks") ssi = b.constants.SUB_SLOT_ITERS_STARTING diff = b.constants.DIFFICULTY_STARTING - futures = await pre_validate_blocks_multiprocessing( - b.constants, - AugmentedBlockchain(b), - blocks, - b.pool, - {}, - ValidationState(ssi, diff, None), - ) + chain = AugmentedBlockchain(b) + vs = ValidationState(ssi, diff, None) + futures = [] + for block in blocks: + futures.append( + await pre_validate_block( + b.constants, + chain, + block, + b.pool, + None, + vs, + ) + ) pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) for i, block in enumerate(blocks): if block.height != 0 and len(block.finished_sub_slots) > 0: @@ -3922,29 +3922,29 @@ async def test_reorg_flip_flop(empty_blockchain: Blockchain, bt: BlockTools) -> block1, block2 = b1, b2 counter += 1 - futures = await pre_validate_blocks_multiprocessing( + future = await pre_validate_block( b.constants, AugmentedBlockchain(b), - [block1], + block1, b.pool, - {}, + None, ValidationState(ssi, diff, None), ) - preval: list[PreValidationResult] = list(await asyncio.gather(*futures)) + preval = await future fork_info = ForkInfo(block1.height - 1, block1.height - 1, block1.prev_header_hash) - _, err, _ = await b.add_block(block1, preval[0], sub_slot_iters=ssi, fork_info=fork_info) + _, err, _ = await b.add_block(block1, preval, sub_slot_iters=ssi, fork_info=fork_info) assert err is None - futures = await pre_validate_blocks_multiprocessing( + future = await pre_validate_block( b.constants, AugmentedBlockchain(b), - [block2], + block2, b.pool, - {}, + None, ValidationState(ssi, diff, None), ) - preval = list(await asyncio.gather(*futures)) + preval = await future fork_info = ForkInfo(block2.height - 1, block2.height - 1, block2.prev_header_hash) - _, err, _ = await b.add_block(block2, preval[0], sub_slot_iters=ssi, fork_info=fork_info) + _, err, _ = await b.add_block(block2, preval, sub_slot_iters=ssi, fork_info=fork_info) assert err is None peak = b.get_peak() @@ -3968,14 +3968,21 @@ async def test_get_tx_peak(default_400_blocks: list[FullBlock], empty_blockchain test_blocks = default_400_blocks[:100] ssi = bc.constants.SUB_SLOT_ITERS_STARTING diff = bc.constants.DIFFICULTY_STARTING - futures = await pre_validate_blocks_multiprocessing( - bc.constants, - AugmentedBlockchain(bc), - test_blocks, - bc.pool, - {}, - ValidationState(ssi, diff, None), - ) + futures: list[Awaitable[PreValidationResult]] = [] + chain = AugmentedBlockchain(bc) + vs = ValidationState(ssi, diff, None) + for block in test_blocks: + futures.append( + await pre_validate_block( + bc.constants, + chain, + block, + bc.pool, + None, + vs, + ) + ) + res: list[PreValidationResult] = list(await asyncio.gather(*futures)) last_tx_block_record = None diff --git a/chia/_tests/core/full_node/test_full_node.py b/chia/_tests/core/full_node/test_full_node.py index abec2bd49baf..ce1fb0624bbd 100644 --- a/chia/_tests/core/full_node/test_full_node.py +++ b/chia/_tests/core/full_node/test_full_node.py @@ -7,7 +7,7 @@ import random import time from collections.abc import Coroutine -from typing import Optional +from typing import Awaitable, Optional import pytest from chia_rs import AugSchemeMPL, G2Element, PrivateKey, SpendBundleConditions @@ -24,7 +24,7 @@ from chia._tests.util.setup_nodes import SimulatorsAndWalletsServices from chia._tests.util.time_out_assert import time_out_assert, time_out_assert_custom_interval, time_out_messages from chia.consensus.block_body_validation import ForkInfo -from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing +from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_block from chia.consensus.pot_iterations import is_overflow_block from chia.full_node.full_node import WalletUpdate from chia.full_node.full_node_api import FullNodeAPI @@ -440,16 +440,21 @@ async def check_transaction_confirmed(transaction) -> bool: for reorg_block in reog_blocks[:r]: await _validate_and_add_block_no_error(blockchain, reorg_block, fork_info=fork_info) for i in range(1, height): - futures = await pre_validate_blocks_multiprocessing( - blockchain.constants, - AugmentedBlockchain(blockchain), - all_blocks[:i], - blockchain.pool, - {}, - ValidationState(ssi, diff, None), - ) + vs = ValidationState(ssi, diff, None) + chain = AugmentedBlockchain(blockchain) + futures: list[Awaitable[PreValidationResult]] = [] + for block in all_blocks[:i]: + futures.append( + await pre_validate_block( + blockchain.constants, + chain, + block, + blockchain.pool, + None, + vs, + ) + ) results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert results is not None for result in results: assert result.error is None @@ -458,16 +463,14 @@ async def check_transaction_confirmed(transaction) -> bool: for block in all_blocks[:r]: await _validate_and_add_block_no_error(blockchain, block, fork_info=fork_info) for i in range(1, height): - futures = await pre_validate_blocks_multiprocessing( - blockchain.constants, - AugmentedBlockchain(blockchain), - all_blocks[:i], - blockchain.pool, - {}, - ValidationState(ssi, diff, None), - ) - results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert results is not None + vs = ValidationState(ssi, diff, None) + chain = AugmentedBlockchain(blockchain) + futures = [] + for block in all_blocks[:i]: + futures.append( + await pre_validate_block(blockchain.constants, chain, block, blockchain.pool, None, vs) + ) + results = list(await asyncio.gather(*futures)) for result in results: assert result.error is None diff --git a/chia/_tests/farmer_harvester/test_third_party_harvesters.py b/chia/_tests/farmer_harvester/test_third_party_harvesters.py index 7c0181e3708c..611b0bda1165 100644 --- a/chia/_tests/farmer_harvester/test_third_party_harvesters.py +++ b/chia/_tests/farmer_harvester/test_third_party_harvesters.py @@ -16,7 +16,7 @@ from chia.consensus.block_body_validation import ForkInfo from chia.consensus.blockchain import AddBlockResult from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty -from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing +from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_block from chia.farmer.farmer import Farmer, calculate_harvester_fee_quality from chia.farmer.farmer_api import FarmerAPI from chia.full_node.full_node import FullNode @@ -438,14 +438,19 @@ async def add_test_blocks_into_full_node(blocks: list[FullBlock], full_node: Ful prev_ses_block = curr new_slot = len(block.finished_sub_slots) > 0 ssi, diff = get_next_sub_slot_iters_and_difficulty(full_node.constants, new_slot, prev_b, full_node.blockchain) - futures = await pre_validate_blocks_multiprocessing( - full_node.blockchain.constants, - AugmentedBlockchain(full_node.blockchain), - blocks, - full_node.blockchain.pool, - {}, - ValidationState(ssi, diff, prev_ses_block), - ) + futures = [] + chain = AugmentedBlockchain(full_node.blockchain) + for block in blocks: + futures.append( + await pre_validate_block( + full_node.blockchain.constants, + chain, + block, + full_node.blockchain.pool, + None, + ValidationState(ssi, diff, prev_ses_block), + ) + ) pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) assert pre_validation_results is not None and len(pre_validation_results) == len(blocks) for i in range(len(blocks)): diff --git a/chia/consensus/multiprocess_validation.py b/chia/consensus/multiprocess_validation.py index 12fe96d97de6..c355c1670e05 100644 --- a/chia/consensus/multiprocess_validation.py +++ b/chia/consensus/multiprocess_validation.py @@ -5,7 +5,7 @@ import logging import time import traceback -from collections.abc import Awaitable, Sequence +from collections.abc import Awaitable from concurrent.futures import Executor from dataclasses import dataclass from typing import Optional @@ -73,7 +73,7 @@ def _run_block( ) -def pre_validate_block( +def _pre_validate_block( constants: ConsensusConstants, blockchain: BlockRecordsProtocol, block: FullBlock, @@ -96,11 +96,10 @@ def pre_validate_block( tx_additions: list[Coin] = [] removals: list[bytes32] = [] if conds is not None: + assert conds.validated_signature is True + assert block.transactions_generator is not None removals, tx_additions = tx_removals_and_additions(conds) elif block.transactions_generator is not None: - # TODO: this function would be simpler if conds was - # required to be passed in for all transaction blocks. We would - # no longer need prev_generators assert prev_generators is not None assert block.transactions_info is not None @@ -147,21 +146,21 @@ def pre_validate_block( return PreValidationResult(uint16(Err.UNKNOWN.value), None, None, uint32(validation_time * 1000)) -async def pre_validate_blocks_multiprocessing( +async def pre_validate_block( constants: ConsensusConstants, blockchain: AugmentedBlockchain, - blocks: Sequence[FullBlock], + block: FullBlock, pool: Executor, - block_height_conds_map: dict[uint32, SpendBundleConditions], + conds: Optional[SpendBundleConditions], vs: ValidationState, *, wp_summaries: Optional[list[SubEpochSummary]] = None, -) -> Sequence[Awaitable[PreValidationResult]]: +) -> Awaitable[PreValidationResult]: """ This method must be called under the blockchain lock - The blocks passed to this function are submitted to be validated in the - executor passed in as "pool". The futures for those jobs are then returned. - When awaited, the return value is the PreValidationResult for each block. + The block passed to this function is submitted to be validated in the + executor passed in as "pool". The future for the job is then returned. + When awaited, the return value is the PreValidationResult for the block. The PreValidationResult indicates whether the block was valid or not. Args: @@ -171,10 +170,12 @@ async def pre_validate_blocks_multiprocessing( be included, even if they haven't been added to the underlying blockchain database yet. The blocks passed in will be added/augmented onto this blockchain. pool: The executor to submit the validation jobs to - blocks: list of full blocks to validate (must be connected to current chain) - vs: The ValidationState refers to the state for the first block in the batch. + block: The full block to validate (must be connected to current chain) + conds: The SpendBundleConditions for transaction blocks, if we have one. + This will be computed if None is passed. + vs: The ValidationState refers to the state for the block. This is an in-out parameter that will be updated to the validation state - for the next batch of blocks. It includes subslot iterators, difficulty and + for the next block. It includes subslot iterators, difficulty and the previous sub epoch summary (ses) block. wp_summaries: validate_signatures: @@ -184,89 +185,81 @@ async def pre_validate_blocks_multiprocessing( async def return_error(error_code: Err) -> PreValidationResult: return PreValidationResult(uint16(error_code.value), None, None, uint32(0)) - if blocks[0].height > 0: - curr = blockchain.try_block_record(blocks[0].prev_header_hash) + if block.height > 0: + curr = blockchain.try_block_record(block.prev_header_hash) if curr is None: - return [return_error(Err.INVALID_PREV_BLOCK_HASH)] + return return_error(Err.INVALID_PREV_BLOCK_HASH) prev_b = curr - futures = [] - # Pool of workers to validate blocks concurrently - - for block in blocks: - assert isinstance(block, FullBlock) - if len(block.finished_sub_slots) > 0: - if block.finished_sub_slots[0].challenge_chain.new_difficulty is not None: - vs.current_difficulty = block.finished_sub_slots[0].challenge_chain.new_difficulty - if block.finished_sub_slots[0].challenge_chain.new_sub_slot_iters is not None: - vs.current_ssi = block.finished_sub_slots[0].challenge_chain.new_sub_slot_iters - overflow = is_overflow_block(constants, block.reward_chain_block.signage_point_index) - challenge = get_block_challenge(constants, block, blockchain, prev_b is None, overflow, False) - if block.reward_chain_block.challenge_chain_sp_vdf is None: - cc_sp_hash: bytes32 = challenge - else: - cc_sp_hash = block.reward_chain_block.challenge_chain_sp_vdf.output.get_hash() - q_str: Optional[bytes32] = verify_and_get_quality_string( - block.reward_chain_block.proof_of_space, constants, challenge, cc_sp_hash, height=block.height - ) - if q_str is None: - return [return_error(Err.INVALID_POSPACE)] - - required_iters: uint64 = calculate_iterations_quality( - constants.DIFFICULTY_CONSTANT_FACTOR, - q_str, - block.reward_chain_block.proof_of_space.size, - vs.current_difficulty, - cc_sp_hash, - ) + assert isinstance(block, FullBlock) + if len(block.finished_sub_slots) > 0: + if block.finished_sub_slots[0].challenge_chain.new_difficulty is not None: + vs.current_difficulty = block.finished_sub_slots[0].challenge_chain.new_difficulty + if block.finished_sub_slots[0].challenge_chain.new_sub_slot_iters is not None: + vs.current_ssi = block.finished_sub_slots[0].challenge_chain.new_sub_slot_iters + overflow = is_overflow_block(constants, block.reward_chain_block.signage_point_index) + challenge = get_block_challenge(constants, block, blockchain, prev_b is None, overflow, False) + if block.reward_chain_block.challenge_chain_sp_vdf is None: + cc_sp_hash: bytes32 = challenge + else: + cc_sp_hash = block.reward_chain_block.challenge_chain_sp_vdf.output.get_hash() + q_str: Optional[bytes32] = verify_and_get_quality_string( + block.reward_chain_block.proof_of_space, constants, challenge, cc_sp_hash, height=block.height + ) + if q_str is None: + return return_error(Err.INVALID_POSPACE) + + required_iters: uint64 = calculate_iterations_quality( + constants.DIFFICULTY_CONSTANT_FACTOR, + q_str, + block.reward_chain_block.proof_of_space.size, + vs.current_difficulty, + cc_sp_hash, + ) - try: - block_rec = block_to_block_record( - constants, - blockchain, - required_iters, - block, - sub_slot_iters=vs.current_ssi, - prev_ses_block=vs.prev_ses_block, - ) - except ValueError: - log.exception("block_to_block_record()") - return [return_error(Err.INVALID_SUB_EPOCH_SUMMARY)] - - if block_rec.sub_epoch_summary_included is not None and wp_summaries is not None: - next_ses = wp_summaries[int(block.height / constants.SUB_EPOCH_BLOCKS) - 1] - if not block_rec.sub_epoch_summary_included.get_hash() == next_ses.get_hash(): - log.error("sub_epoch_summary does not match wp sub_epoch_summary list") - return [return_error(Err.INVALID_SUB_EPOCH_SUMMARY)] - - blockchain.add_extra_block(block, block_rec) # Temporarily add block to chain - prev_b = block_rec - - previous_generators: Optional[list[bytes]] = None - - try: - block_generator: Optional[BlockGenerator] = await get_block_generator( - blockchain.lookup_block_generators, block - ) - if block_generator is not None: - previous_generators = block_generator.generator_refs - except ValueError: - return [return_error(Err.FAILED_GETTING_GENERATOR_MULTIPROCESSING)] - - futures.append( - asyncio.get_running_loop().run_in_executor( - pool, - pre_validate_block, - constants, - blockchain, - block, - previous_generators, - block_height_conds_map.get(block.height), - copy.copy(vs), - ) + try: + block_rec = block_to_block_record( + constants, + blockchain, + required_iters, + block, + sub_slot_iters=vs.current_ssi, + prev_ses_block=vs.prev_ses_block, ) + except ValueError: + log.exception("block_to_block_record()") + return return_error(Err.INVALID_SUB_EPOCH_SUMMARY) + + if block_rec.sub_epoch_summary_included is not None and wp_summaries is not None: + next_ses = wp_summaries[int(block.height / constants.SUB_EPOCH_BLOCKS) - 1] + if not block_rec.sub_epoch_summary_included.get_hash() == next_ses.get_hash(): + log.error("sub_epoch_summary does not match wp sub_epoch_summary list") + return return_error(Err.INVALID_SUB_EPOCH_SUMMARY) + + blockchain.add_extra_block(block, block_rec) # Temporarily add block to chain + prev_b = block_rec + + previous_generators: Optional[list[bytes]] = None + + try: + block_generator: Optional[BlockGenerator] = await get_block_generator(blockchain.lookup_block_generators, block) + if block_generator is not None: + previous_generators = block_generator.generator_refs + except ValueError: + return return_error(Err.FAILED_GETTING_GENERATOR_MULTIPROCESSING) + + future = asyncio.get_running_loop().run_in_executor( + pool, + _pre_validate_block, + constants, + blockchain, + block, + previous_generators, + conds, + copy.copy(vs), + ) - if block_rec.sub_epoch_summary_included is not None: - vs.prev_ses_block = block_rec + if block_rec.sub_epoch_summary_included is not None: + vs.prev_ses_block = block_rec - return futures + return future diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 7f3a83001228..8face1950610 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -33,7 +33,7 @@ from chia.consensus.cost_calculator import NPCResult from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty from chia.consensus.make_sub_epoch_summary import next_sub_epoch_summary -from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing +from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_block from chia.consensus.pot_iterations import calculate_sp_iters from chia.full_node.block_store import BlockStore from chia.full_node.coin_store import CoinStore @@ -1526,7 +1526,7 @@ async def prevalidate_blocks( wp_summaries: Optional[list[SubEpochSummary]] = None, ) -> Sequence[Awaitable[PreValidationResult]]: """ - This is a thin wrapper over pre_validate_blocks_multiprocessing(). + This is a thin wrapper over pre_validate_block(). Args: blockchain: @@ -1540,17 +1540,22 @@ async def prevalidate_blocks( # Validates signatures in multiprocessing since they take a while, and we don't have cached transactions # for these blocks (unlike during normal operation where we validate one at a time) # We have to copy the ValidationState object to preserve it for the add_block() - # call below. pre_validate_blocks_multiprocessing() will update the + # call below. pre_validate_block() will update the # object we pass in. - return await pre_validate_blocks_multiprocessing( - self.constants, - blockchain, - blocks_to_validate, - self.blockchain.pool, - {}, - vs, - wp_summaries=wp_summaries, - ) + ret: list[Awaitable[PreValidationResult]] = [] + for block in blocks_to_validate: + ret.append( + await pre_validate_block( + self.constants, + blockchain, + block, + self.blockchain.pool, + None, + vs, + wp_summaries=wp_summaries, + ) + ) + return ret async def add_prevalidated_blocks( self, @@ -2038,9 +2043,9 @@ async def add_block( return None validation_start = time.monotonic() # Tries to add the block to the blockchain, if we already validated transactions, don't do it again - block_height_conds_map = {} + conds = None if pre_validation_result is not None and pre_validation_result.conds is not None: - block_height_conds_map[block.height] = pre_validation_result.conds + conds = pre_validation_result.conds # Don't validate signatures because we want to validate them in the main thread later, since we have a # cache available @@ -2055,36 +2060,34 @@ async def add_block( prev_ses_block = curr new_slot = len(block.finished_sub_slots) > 0 ssi, diff = get_next_sub_slot_iters_and_difficulty(self.constants, new_slot, prev_b, self.blockchain) - futures = await pre_validate_blocks_multiprocessing( + future = await pre_validate_block( self.blockchain.constants, AugmentedBlockchain(self.blockchain), - [block], + block, self.blockchain.pool, - block_height_conds_map, + conds, ValidationState(ssi, diff, prev_ses_block), ) - pre_validation_results = list(await asyncio.gather(*futures)) + pre_validation_result = await future added: Optional[AddBlockResult] = None pre_validation_time = time.monotonic() - validation_start try: - if len(pre_validation_results) < 1: - raise ValueError(f"Failed to validate block {header_hash} height {block.height}") - if pre_validation_results[0].error is not None: - if Err(pre_validation_results[0].error) == Err.INVALID_PREV_BLOCK_HASH: + if pre_validation_result.error is not None: + if Err(pre_validation_result.error) == Err.INVALID_PREV_BLOCK_HASH: added = AddBlockResult.DISCONNECTED_BLOCK error_code: Optional[Err] = Err.INVALID_PREV_BLOCK_HASH - elif Err(pre_validation_results[0].error) == Err.TIMESTAMP_TOO_FAR_IN_FUTURE: + elif Err(pre_validation_result.error) == Err.TIMESTAMP_TOO_FAR_IN_FUTURE: raise TimestampError() else: raise ValueError( f"Failed to validate block {header_hash} height " - f"{block.height}: {Err(pre_validation_results[0].error).name}" + f"{block.height}: {Err(pre_validation_result.error).name}" ) else: result_to_validate = ( - pre_validation_results[0] if pre_validation_result is None else pre_validation_result + pre_validation_result if pre_validation_result is None else pre_validation_result ) - assert result_to_validate.required_iters == pre_validation_results[0].required_iters + assert result_to_validate.required_iters == pre_validation_result.required_iters fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) (added, error_code, state_change_summary) = await self.blockchain.add_block( block, result_to_validate, ssi, fork_info @@ -2142,7 +2145,7 @@ async def add_block( logging.WARNING if validation_time > 2 else logging.DEBUG, f"Block validation: {validation_time:0.2f}s, " f"pre_validation: {pre_validation_time:0.2f}s, " - f"CLVM: {pre_validation_results[0].timing / 1000.0:0.2f}s, " + f"CLVM: {pre_validation_result.timing / 1000.0:0.2f}s, " f"post-process: {post_process_time:0.2f}s, " f"cost: {block.transactions_info.cost if block.transactions_info is not None else 'None'}" f"{percent_full_str} header_hash: {header_hash.hex()} height: {block.height}", diff --git a/chia/simulator/full_node_simulator.py b/chia/simulator/full_node_simulator.py index 72e9c9b0da32..9dd3de33622b 100644 --- a/chia/simulator/full_node_simulator.py +++ b/chia/simulator/full_node_simulator.py @@ -12,7 +12,7 @@ from chia.consensus.block_record import BlockRecord from chia.consensus.block_rewards import calculate_base_farmer_reward, calculate_pool_reward from chia.consensus.blockchain import BlockchainMutexPriority -from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_blocks_multiprocessing +from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_block from chia.full_node.full_node import FullNode from chia.full_node.full_node_api import FullNodeAPI from chia.rpc.rpc_server import default_get_connections @@ -175,20 +175,19 @@ async def farm_new_transaction_block( current_blocks = await self.get_all_full_blocks() if len(current_blocks) == 0: genesis = self.bt.get_consecutive_blocks(uint8(1))[0] - futures = await pre_validate_blocks_multiprocessing( + future = await pre_validate_block( self.full_node.blockchain.constants, AugmentedBlockchain(self.full_node.blockchain), - [genesis], + genesis, self.full_node.blockchain.pool, - {}, + None, ValidationState(ssi, diff, None), ) - pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert pre_validation_results is not None + pre_validation_result: PreValidationResult = await future fork_info = ForkInfo(-1, -1, self.full_node.constants.GENESIS_CHALLENGE) await self.full_node.blockchain.add_block( genesis, - pre_validation_results[0], + pre_validation_result, self.full_node.constants.SUB_SLOT_ITERS_STARTING, fork_info, ) @@ -237,18 +236,17 @@ async def farm_new_block(self, request: FarmNewBlockProtocol, force_wait_for_tim current_blocks = await self.get_all_full_blocks() if len(current_blocks) == 0: genesis = self.bt.get_consecutive_blocks(uint8(1))[0] - futures = await pre_validate_blocks_multiprocessing( + future = await pre_validate_block( self.full_node.blockchain.constants, AugmentedBlockchain(self.full_node.blockchain), - [genesis], + genesis, self.full_node.blockchain.pool, - {}, + None, ValidationState(ssi, diff, None), ) - pre_validation_results: list[PreValidationResult] = list(await asyncio.gather(*futures)) - assert pre_validation_results is not None + pre_validation_result: PreValidationResult = await future fork_info = ForkInfo(-1, -1, self.full_node.constants.GENESIS_CHALLENGE) - await self.full_node.blockchain.add_block(genesis, pre_validation_results[0], ssi, fork_info) + await self.full_node.blockchain.add_block(genesis, pre_validation_result, ssi, fork_info) peak = self.full_node.blockchain.get_peak() assert peak is not None curr: BlockRecord = peak