diff --git a/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs b/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs index e34503b5d..1a23f08ee 100644 --- a/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs +++ b/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs @@ -26,7 +26,7 @@ use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tracing::{debug, info, info_span, warn, Instrument}; -const GC_INTERVAL: Duration = Duration::from_secs(30); +const GC_INTERVAL: Duration = Duration::from_secs(10); const TOO_NEW_TOLERANCE: u64 = 32; pub struct TransactionPipe { @@ -102,9 +102,36 @@ impl TransactionPipe { } } + pub(crate) async fn gc(&mut self) -> Result<(), Error> { + // todo: these will be slightly off, but gc does not need to be exact + let now = Instant::now(); + let epoch_ms_now = chrono::Utc::now().timestamp_millis() as u64; + + // garbage collect the used sequence number pool + self.used_sequence_number_pool.gc(epoch_ms_now); + + // garbage collect the transactions in flight + { + // unwrap because failure indicates poisoned lock + let mut transactions_in_flight = self.transactions_in_flight.write().unwrap(); + transactions_in_flight.gc(epoch_ms_now); + } + + // garbage collect the core mempool + self.core_mempool.gc(); + + self.last_gc = now; + + Ok(()) + } + /// Pipes a batch of transactions from the mempool to the transaction channel. /// todo: it may be wise to move the batching logic up a level to the consuming structs. pub(crate) async fn tick(&mut self) -> Result<(), Error> { + if self.last_gc.elapsed() >= GC_INTERVAL { + self.gc().await?; + } + let next = self.mempool_client_receiver.next().await; if let Some(request) = next { match request { @@ -132,27 +159,6 @@ impl TransactionPipe { return Err(Error::InputClosed); } - if self.last_gc.elapsed() >= GC_INTERVAL { - // todo: these will be slightly off, but gc does not need to be exact - let now = Instant::now(); - let epoch_ms_now = chrono::Utc::now().timestamp_millis() as u64; - - // garbage collect the used sequence number pool - self.used_sequence_number_pool.gc(epoch_ms_now); - - // garbage collect the transactions in flight - { - // unwrap because failure indicates poisoned lock - let mut transactions_in_flight = self.transactions_in_flight.write().unwrap(); - transactions_in_flight.gc(epoch_ms_now); - } - - // garbage collect the core mempool - self.core_mempool.gc(); - - self.last_gc = now; - } - Ok(()) } @@ -270,7 +276,7 @@ impl TransactionPipe { debug!("Adding transaction to mempool: {:?} {:?}", transaction, sequence_number); let status = self.core_mempool.add_txn( transaction.clone(), - 0, + tx_result.score(), sequence_number, TimelineState::NonQualified, true, @@ -312,6 +318,18 @@ impl TransactionPipe { // report status Ok((status, None)) } + + /// Sets the last gc time to a provided time + #[cfg(test)] + async fn set_last_gc(&mut self, last_gc: Instant) { + self.last_gc = last_gc; + } + + /// Unsets the sequence number for a given account + #[cfg(test)] + fn unset_sequence_number(&mut self, account_address: &AccountAddress) { + self.used_sequence_number_pool.remove_sequence_number(account_address); + } } #[cfg(test)] @@ -614,4 +632,83 @@ mod tests { Ok(()) } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_gc_before_validating() -> Result<(), anyhow::Error> { + // create a transaction pipe + let (tx_sender, _tx_receiver) = mpsc::channel(16); + let (executor, _tempdir) = Executor::try_test_default(GENESIS_KEYPAIR.0.clone())?; + let (context, background) = executor.background(tx_sender)?; + let mut transaction_pipe = background.into_transaction_pipe(); + + // generate a transaction with sequence number 1, to which we will try to submit three times + let transaction = create_signed_transaction(1, &context.config().chain); + + let start = Instant::now(); + // send the transaction to the mempool, expect success + let submission_result = transaction_pipe.submit_transaction(transaction).await?; + match submission_result.0.code { + MempoolStatusCode::Accepted => { + info!("Transaction accepted"); + } + _ => { + return Err(anyhow::anyhow!( + "Expected transaction to be accepted, but got {:?}", + submission_result.0.code + )); + } + } + + // send the transaction that uses the same sequence number, expect failure + let transaction = create_signed_transaction(1, &context.config().chain); + let submission_result = transaction_pipe.submit_transaction(transaction).await?; + match submission_result.0.code { + MempoolStatusCode::Accepted => { + return Err(anyhow::anyhow!( + "Expected transaction to be rejected, but got accepted" + )); + } + MempoolStatusCode::UnknownStatus => { + return Err(anyhow::anyhow!( + "Expected transaction to be rejected, but got unknown status" + )); + } + status => { + info!("Transaction rejected with status: {:?}", status); + } + } + + // Test transactions and core mempool use the system clock, so we need to sleep + tokio::time::sleep(Duration::from_secs(30)).await; + + // set the last gc time to a time greater than the expiration s.t. the transaction is garbage collected on the next submission + transaction_pipe.set_last_gc(start - (GC_INTERVAL * 2)).await; + + // send the transaction to the mempool, expect success because the duplicate has been garbage collected + let transaction = create_signed_transaction(1, &context.config().chain); + + // todo: gc needs to be rewritten for the sequence number mempool to respect transaction expiration, see #966 + // for now, we will manually remove this + transaction_pipe.unset_sequence_number(&transaction.sender()); + + // todo: eventually we will want to update this test to check the background task pipe is running. However, because of limited instrumentation, we will reserve this for a future PR. + // Manually run the garbage collection + transaction_pipe.gc().await?; + + /*let submission_result = transaction_pipe.submit_transaction(transaction).await?; + match submission_result.0.code { + MempoolStatusCode::Accepted => { + info!("Transaction accepted"); + } + _ => { + return Err(anyhow::anyhow!( + "Expected transaction to be accepted, but got {:?}", + submission_result.0.code + )); + } + }*/ + + Ok(()) + } } diff --git a/protocol-units/execution/maptos/opt-executor/src/executor/initialization.rs b/protocol-units/execution/maptos/opt-executor/src/executor/initialization.rs index c8e0e0c17..effbda619 100644 --- a/protocol-units/execution/maptos/opt-executor/src/executor/initialization.rs +++ b/protocol-units/execution/maptos/opt-executor/src/executor/initialization.rs @@ -134,6 +134,7 @@ impl Executor { // replace the db path with the temporary directory maptos_config.chain.maptos_db_path.replace(tempdir.path().to_path_buf()); + let executor = Self::try_from_config(maptos_config)?; Ok((executor, tempdir)) }