diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 73fd4ca346b..ad473543715 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -137,6 +137,11 @@ class producer_plugin_impl : public std::enable_shared_from_this(); - auto& persisted_by_expiry = _persistent_transactions.get(); - if (!persisted_by_expiry.empty()) { - int num_expired_persistent = 0; - int orig_count = _persistent_transactions.size(); + if( !remove_expired_persisted_trxs( preprocess_deadline ) ) + return start_block_result::exhausted; - while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pbs->header.timestamp.to_time_point()) { - if (preprocess_deadline <= fc::time_point::now()) { - exhausted = true; - break; - } - auto const& txid = persisted_by_expiry.begin()->trx_id; - if (_pending_block_mode == pending_block_mode::producing) { - fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is EXPIRING PERSISTED tx: ${txid}", - ("block_num", chain.head_block_num() + 1) - ("prod", chain.pending_block_state()->header.producer) - ("txid", txid)); - } else { - fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is EXPIRING PERSISTED tx: ${txid}", - ("txid", txid)); - } + if( !remove_expired_blacklisted_trxs( preprocess_deadline ) ) + return start_block_result::exhausted; + + // limit execution of pending incoming to once per block + size_t pending_incoming_process_limit = _pending_incoming_transactions.size(); - persisted_by_expiry.erase(persisted_by_expiry.begin()); - num_expired_persistent++; + if( !process_unapplied_trxs( preprocess_deadline ) ) + return start_block_result::exhausted; + + if (_pending_block_mode == pending_block_mode::producing) { + auto scheduled_trx_deadline = preprocess_deadline; + if (_max_scheduled_transaction_time_per_block_ms >= 0) { + scheduled_trx_deadline = std::min( + scheduled_trx_deadline, + fc::time_point::now() + fc::milliseconds(_max_scheduled_transaction_time_per_block_ms) + ); + } + // may exhaust scheduled_trx_deadline but not preprocess_deadline, exhausted preprocess_deadline checked below + process_scheduled_and_incoming_trxs( scheduled_trx_deadline, pending_incoming_process_limit ); } - if( exhausted ) { - fc_wlog( _log, "Unable to process all ${n} persisted transactions before deadline, Expired ${expired}", - ( "n", orig_count ) - ( "expired", num_expired_persistent ) ); + if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit() + return start_block_result::failed; + if (preprocess_deadline <= fc::time_point::now()) { + return start_block_result::exhausted; } else { - fc_dlog( _log, "Processed ${n} persisted transactions, Expired ${expired}", - ( "n", orig_count ) - ( "expired", num_expired_persistent ) ); + if( !process_incoming_trxs( preprocess_deadline, pending_incoming_process_limit ) ) + return start_block_result::exhausted; + return start_block_result::succeeded; } + + } catch ( const guard_exception& e ) { + chain_plugin::handle_guard_exception(e); + return start_block_result::failed; + } catch ( std::bad_alloc& ) { + chain_plugin::handle_bad_alloc(); + } catch ( boost::interprocess::bad_alloc& ) { + chain_plugin::handle_db_exhaustion(); } - try { - size_t orig_pending_txn_size = _pending_incoming_transactions.size(); - - // Processing unapplied transactions... - // - if (_producers.empty() && persisted_by_id.empty()) { - // if this node can never produce and has no persisted transactions, - // there is no need for unapplied transactions they can be dropped - chain.get_unapplied_transactions().clear(); - } else { - // derive appliable transactions from unapplied_transactions and drop droppable transactions - unapplied_transactions_type& unapplied_trxs = chain.get_unapplied_transactions(); - if( !unapplied_trxs.empty() ) { - auto unapplied_trxs_size = unapplied_trxs.size(); - int num_applied = 0; - int num_failed = 0; - int num_processed = 0; - auto calculate_transaction_category = [&](const transaction_metadata_ptr& trx) { - if (trx->packed_trx->expiration() < pbs->header.timestamp.to_time_point()) { - return tx_category::EXPIRED; - } else if (persisted_by_id.find(trx->id) != persisted_by_id.end()) { - return tx_category::PERSISTED; - } else { - return tx_category::UNEXPIRED_UNPERSISTED; - } - }; - - auto itr = unapplied_trxs.begin(); - while( itr != unapplied_trxs.end() ) { - auto itr_next = itr; // save off next since itr may be invalidated by loop - ++itr_next; - - if( preprocess_deadline <= fc::time_point::now() ) exhausted = true; - if( exhausted ) break; - const transaction_metadata_ptr trx = itr->second; - auto category = calculate_transaction_category(trx); - if (category == tx_category::EXPIRED || - (category == tx_category::UNEXPIRED_UNPERSISTED && _producers.empty())) - { - if (!_producers.empty()) { - fc_dlog(_trx_trace_log, "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED : ${txid}", - ("txid", trx->id)); - } - itr = unapplied_trxs.erase( itr ); // unapplied_trxs map has not been modified, so simply erase and continue - continue; - } else if (category == tx_category::PERSISTED || - (category == tx_category::UNEXPIRED_UNPERSISTED && _pending_block_mode == pending_block_mode::producing)) - { - ++num_processed; - - try { - auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); - bool deadline_is_subjective = false; - if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && preprocess_deadline < deadline)) { - deadline_is_subjective = true; - deadline = preprocess_deadline; - } - - auto trace = chain.push_transaction(trx, deadline); - if (trace->except) { - if (failure_is_subjective(*trace->except, deadline_is_subjective)) { - exhausted = true; - break; - } else { - // this failed our configured maximum transaction time, we don't want to replay it - // chain.plus_transactions can modify unapplied_trxs, so erase by id - unapplied_trxs.erase( trx->signed_id ); - ++num_failed; - } - } else { - ++num_applied; - } - } LOG_AND_DROP(); - } + } - itr = itr_next; - } + return start_block_result::failed; +} - fc_dlog(_log, "Processed ${m} of ${n} previously applied transactions, Applied ${applied}, Failed/Dropped ${failed}", - ("m", num_processed) - ("n", unapplied_trxs_size) - ("applied", num_applied) - ("failed", num_failed)); - } +bool producer_plugin_impl::remove_expired_persisted_trxs( const fc::time_point& deadline ) +{ + bool exhausted = false; + chain::controller& chain = chain_plug->chain(); + const auto& pbs = chain.pending_block_state(); + auto& persisted_by_expiry = _persistent_transactions.get(); + if (!persisted_by_expiry.empty()) { + int num_expired_persistent = 0; + int orig_count = _persistent_transactions.size(); + + while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pbs->header.timestamp.to_time_point()) { + if (deadline <= fc::time_point::now()) { + exhausted = true; + break; } - + auto const& txid = persisted_by_expiry.begin()->trx_id; if (_pending_block_mode == pending_block_mode::producing) { - auto& blacklist_by_id = _blacklisted_transactions.get(); - auto& blacklist_by_expiry = _blacklisted_transactions.get(); - auto now = fc::time_point::now(); - if(!blacklist_by_expiry.empty()) { - int num_expired = 0; - int orig_count = _blacklisted_transactions.size(); - - while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= now) { - if (preprocess_deadline <= fc::time_point::now()) break; - blacklist_by_expiry.erase(blacklist_by_expiry.begin()); - num_expired++; - } - - fc_dlog(_log, "Processed ${n} blacklisted transactions, Expired ${expired}", - ("n", orig_count) - ("expired", num_expired)); - } - - // scheduled transactions - int num_applied = 0; - int num_failed = 0; - int num_processed = 0; + fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is EXPIRING PERSISTED tx: ${txid}", + ("block_num", chain.head_block_num() + 1) + ("prod", chain.pending_block_state()->header.producer) + ("txid", txid)); + } else { + fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is EXPIRING PERSISTED tx: ${txid}", + ("txid", txid)); + } - auto scheduled_trx_deadline = preprocess_deadline; - if (_max_scheduled_transaction_time_per_block_ms >= 0) { - scheduled_trx_deadline = std::min( - scheduled_trx_deadline, - fc::time_point::now() + fc::milliseconds(_max_scheduled_transaction_time_per_block_ms) - ); - } - time_point pending_block_time = chain.pending_block_time(); - const auto& sch_idx = chain.db().get_index(); - const auto scheduled_trxs_size = sch_idx.size(); - auto sch_itr = sch_idx.begin(); - while( sch_itr != sch_idx.end() ) { - if( sch_itr->delay_until > pending_block_time) break; // not scheduled yet - if( sch_itr->published >= pending_block_time ) { - ++sch_itr; - continue; // do not allow schedule and execute in same block - } - if( scheduled_trx_deadline <= fc::time_point::now() ) { - exhausted = true; - break; - } + persisted_by_expiry.erase(persisted_by_expiry.begin()); + num_expired_persistent++; + } - const transaction_id_type trx_id = sch_itr->trx_id; // make copy since reference could be invalidated - if (blacklist_by_id.find(trx_id) != blacklist_by_id.end()) { - ++sch_itr; - continue; - } + if( exhausted ) { + fc_wlog( _log, "Unable to process all ${n} persisted transactions before deadline, Expired ${expired}", + ( "n", orig_count ) + ( "expired", num_expired_persistent ) ); + } else { + fc_dlog( _log, "Processed ${n} persisted transactions, Expired ${expired}", + ( "n", orig_count ) + ( "expired", num_expired_persistent ) ); + } + } + return !exhausted; +} - auto sch_itr_next = sch_itr; // save off next since sch_itr may be invalidated by loop - ++sch_itr_next; - const auto next_delay_until = sch_itr_next != sch_idx.end() ? sch_itr_next->delay_until : sch_itr->delay_until; - const auto next_id = sch_itr_next != sch_idx.end() ? sch_itr_next->id : sch_itr->id; +bool producer_plugin_impl::remove_expired_blacklisted_trxs( const fc::time_point& deadline ) +{ + bool exhausted = false; + auto& blacklist_by_expiry = _blacklisted_transactions.get(); + auto now = fc::time_point::now(); + if(!blacklist_by_expiry.empty()) { + int num_expired = 0; + int orig_count = _blacklisted_transactions.size(); + + while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= now) { + if (deadline <= fc::time_point::now()) { + exhausted = true; + break; + } + blacklist_by_expiry.erase(blacklist_by_expiry.begin()); + num_expired++; + } - num_processed++; + fc_dlog(_log, "Processed ${n} blacklisted transactions, Expired ${expired}", + ("n", orig_count) + ("expired", num_expired)); + } + return !exhausted; +} - // configurable ratio of incoming txns vs deferred txns - while (_incoming_trx_weight >= 1.0 && orig_pending_txn_size && _pending_incoming_transactions.size()) { - if (scheduled_trx_deadline <= fc::time_point::now()) break; +bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadline ) +{ + chain::controller& chain = chain_plug->chain(); + auto& persisted_by_id = _persistent_transactions.get(); - auto e = _pending_incoming_transactions.front(); - _pending_incoming_transactions.pop_front(); - --orig_pending_txn_size; - _incoming_trx_weight -= 1.0; - process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); - } + bool exhausted = false; + // Processing unapplied transactions... + // + if (_producers.empty() && persisted_by_id.empty()) { + // if this node can never produce and has no persisted transactions, + // there is no need for unapplied transactions they can be dropped + chain.get_unapplied_transactions().clear(); + } else { + const auto& pbs = chain.pending_block_state(); + // derive appliable transactions from unapplied_transactions and drop droppable transactions + unapplied_transactions_type& unapplied_trxs = chain.get_unapplied_transactions(); + if( !unapplied_trxs.empty() ) { + auto unapplied_trxs_size = unapplied_trxs.size(); + int num_applied = 0; + int num_failed = 0; + int num_processed = 0; + auto calculate_transaction_category = [&](const transaction_metadata_ptr& trx) { + if (trx->packed_trx->expiration() < pbs->header.timestamp.to_time_point()) { + return tx_category::EXPIRED; + } else if (persisted_by_id.find(trx->id) != persisted_by_id.end()) { + return tx_category::PERSISTED; + } else { + return tx_category::UNEXPIRED_UNPERSISTED; + } + }; - if (scheduled_trx_deadline <= fc::time_point::now()) { - exhausted = true; - break; + auto itr = unapplied_trxs.begin(); + while( itr != unapplied_trxs.end() ) { + auto itr_next = itr; // save off next since itr may be invalidated by loop + ++itr_next; + + if( deadline <= fc::time_point::now() ) exhausted = true; + if( exhausted ) break; + const transaction_metadata_ptr trx = itr->second; + auto category = calculate_transaction_category(trx); + if (category == tx_category::EXPIRED || + (category == tx_category::UNEXPIRED_UNPERSISTED && _producers.empty())) + { + if (!_producers.empty()) { + fc_dlog(_trx_trace_log, "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED : ${txid}", + ("txid", trx->id)); } + itr = unapplied_trxs.erase( itr ); // unapplied_trxs map has not been modified, so simply erase and continue + continue; + } else if (category == tx_category::PERSISTED || + (category == tx_category::UNEXPIRED_UNPERSISTED && _pending_block_mode == pending_block_mode::producing)) + { + ++num_processed; try { - auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); + auto trx_deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); bool deadline_is_subjective = false; - if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && scheduled_trx_deadline < deadline)) { + if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && deadline < trx_deadline)) { deadline_is_subjective = true; - deadline = scheduled_trx_deadline; + trx_deadline = deadline; } - auto trace = chain.push_scheduled_transaction(trx_id, deadline); + auto trace = chain.push_transaction(trx, trx_deadline); if (trace->except) { if (failure_is_subjective(*trace->except, deadline_is_subjective)) { exhausted = true; break; } else { - auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window); - // this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist - _blacklisted_transactions.insert(transaction_id_with_expiry{trx_id, expiration}); - num_failed++; + // this failed our configured maximum transaction time, we don't want to replay it + // chain.plus_transactions can modify unapplied_trxs, so erase by id + unapplied_trxs.erase( trx->signed_id ); + ++num_failed; } } else { - num_applied++; + ++num_applied; } } LOG_AND_DROP(); + } - _incoming_trx_weight += _incoming_defer_ratio; - if (!orig_pending_txn_size) _incoming_trx_weight = 0.0; + itr = itr_next; + } - if( sch_itr_next == sch_idx.end() ) break; - sch_itr = sch_idx.lower_bound( boost::make_tuple( next_delay_until, next_id ) ); - } + fc_dlog(_log, "Processed ${m} of ${n} previously applied transactions, Applied ${applied}, Failed/Dropped ${failed}", + ("m", num_processed)("n", unapplied_trxs_size)("applied", num_applied)("failed", num_failed)); + } + } + return !exhausted; +} - if( scheduled_trxs_size > 0 ) { - fc_dlog( _log, - "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}", - ( "m", num_processed ) - ( "n", scheduled_trxs_size ) - ( "applied", num_applied ) - ( "failed", num_failed ) ); - } +bool producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit ) +{ + chain::controller& chain = chain_plug->chain(); + auto& blacklist_by_id = _blacklisted_transactions.get(); + // scheduled transactions + int num_applied = 0; + int num_failed = 0; + int num_processed = 0; + bool exhausted = false; + double incoming_trx_weight = 0.0; + + time_point pending_block_time = chain.pending_block_time(); + const auto& sch_idx = chain.db().get_index(); + const auto scheduled_trxs_size = sch_idx.size(); + auto sch_itr = sch_idx.begin(); + while( sch_itr != sch_idx.end() ) { + if( sch_itr->delay_until > pending_block_time) break; // not scheduled yet + if( sch_itr->published >= pending_block_time ) { + ++sch_itr; + continue; // do not allow schedule and execute in same block + } + if( deadline <= fc::time_point::now() ) { + exhausted = true; + break; + } + + const transaction_id_type trx_id = sch_itr->trx_id; // make copy since reference could be invalidated + if (blacklist_by_id.find(trx_id) != blacklist_by_id.end()) { + ++sch_itr; + continue; + } + + auto sch_itr_next = sch_itr; // save off next since sch_itr may be invalidated by loop + ++sch_itr_next; + const auto next_delay_until = sch_itr_next != sch_idx.end() ? sch_itr_next->delay_until : sch_itr->delay_until; + const auto next_id = sch_itr_next != sch_idx.end() ? sch_itr_next->id : sch_itr->id; + + num_processed++; + + // configurable ratio of incoming txns vs deferred txns + while (incoming_trx_weight >= 1.0 && pending_incoming_process_limit && _pending_incoming_transactions.size()) { + if (deadline <= fc::time_point::now()) break; + + auto e = _pending_incoming_transactions.front(); + _pending_incoming_transactions.pop_front(); + --pending_incoming_process_limit; + incoming_trx_weight -= 1.0; + process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); + } + if (deadline <= fc::time_point::now()) { + exhausted = true; + break; + } + + try { + auto trx_deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); + bool deadline_is_subjective = false; + if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && deadline < trx_deadline)) { + deadline_is_subjective = true; + trx_deadline = deadline; } - if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit() - return start_block_result::failed; - if (exhausted || preprocess_deadline <= fc::time_point::now()) { - return start_block_result::exhausted; - } else { - // attempt to apply any pending incoming transactions - _incoming_trx_weight = 0.0; - - if (!_pending_incoming_transactions.empty()) { - fc_dlog(_log, "Processing ${n} pending transactions", ("n", _pending_incoming_transactions.size())); - while (orig_pending_txn_size && _pending_incoming_transactions.size()) { - if (preprocess_deadline <= fc::time_point::now()) return start_block_result::exhausted; - auto e = _pending_incoming_transactions.front(); - _pending_incoming_transactions.pop_front(); - --orig_pending_txn_size; - process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); - } + auto trace = chain.push_scheduled_transaction(trx_id, trx_deadline); + if (trace->except) { + if (failure_is_subjective(*trace->except, deadline_is_subjective)) { + exhausted = true; + break; + } else { + auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window); + // this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist + _blacklisted_transactions.insert(transaction_id_with_expiry{trx_id, expiration}); + num_failed++; } - return start_block_result::succeeded; + } else { + num_applied++; } + } LOG_AND_DROP(); - } catch ( const guard_exception& e ) { - chain_plugin::handle_guard_exception(e); - return start_block_result::failed; - } catch ( std::bad_alloc& ) { - chain_plugin::handle_bad_alloc(); - } catch ( boost::interprocess::bad_alloc& ) { - chain_plugin::handle_db_exhaustion(); - } + incoming_trx_weight += _incoming_defer_ratio; + if (!pending_incoming_process_limit) incoming_trx_weight = 0.0; + if( sch_itr_next == sch_idx.end() ) break; + sch_itr = sch_idx.lower_bound( boost::make_tuple( next_delay_until, next_id ) ); } - return start_block_result::failed; + if( scheduled_trxs_size > 0 ) { + fc_dlog( _log, + "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}", + ( "m", num_processed )( "n", scheduled_trxs_size )( "applied", num_applied )( "failed", num_failed ) ); + } + return !exhausted; +} + +bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit ) +{ + bool exhausted = false; + if (!_pending_incoming_transactions.empty()) { + fc_dlog(_log, "Processing ${n} pending transactions", ("n", _pending_incoming_transactions.size())); + while (pending_incoming_process_limit && _pending_incoming_transactions.size()) { + if (deadline <= fc::time_point::now()) { + exhausted = true; + break; + } + auto e = _pending_incoming_transactions.front(); + _pending_incoming_transactions.pop_front(); + --pending_incoming_process_limit; + process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); + } + } + return !exhausted; } void producer_plugin_impl::schedule_production_loop() {