Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 965 #967

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tracing::{debug, info, info_span, warn, Instrument};

const GC_INTERVAL: Duration = Duration::from_secs(30);
const GC_INTERVAL: Duration = Duration::from_secs(10);
const TOO_NEW_TOLERANCE: u64 = 32;

pub struct TransactionPipe {
Expand Down Expand Up @@ -102,9 +102,36 @@ impl TransactionPipe {
}
}

pub(crate) async fn gc(&mut self) -> Result<(), Error> {
// todo: these will be slightly off, but gc does not need to be exact
let now = Instant::now();
let epoch_ms_now = chrono::Utc::now().timestamp_millis() as u64;

// garbage collect the used sequence number pool
self.used_sequence_number_pool.gc(epoch_ms_now);

// garbage collect the transactions in flight
{
// unwrap because failure indicates poisoned lock
let mut transactions_in_flight = self.transactions_in_flight.write().unwrap();
transactions_in_flight.gc(epoch_ms_now);
}

// garbage collect the core mempool
self.core_mempool.gc();

self.last_gc = now;

Ok(())
}

/// Pipes a batch of transactions from the mempool to the transaction channel.
/// todo: it may be wise to move the batching logic up a level to the consuming structs.
pub(crate) async fn tick(&mut self) -> Result<(), Error> {
if self.last_gc.elapsed() >= GC_INTERVAL {
self.gc().await?;
}

let next = self.mempool_client_receiver.next().await;
if let Some(request) = next {
match request {
Expand Down Expand Up @@ -132,27 +159,6 @@ impl TransactionPipe {
return Err(Error::InputClosed);
}

if self.last_gc.elapsed() >= GC_INTERVAL {
// todo: these will be slightly off, but gc does not need to be exact
let now = Instant::now();
let epoch_ms_now = chrono::Utc::now().timestamp_millis() as u64;

// garbage collect the used sequence number pool
self.used_sequence_number_pool.gc(epoch_ms_now);

// garbage collect the transactions in flight
{
// unwrap because failure indicates poisoned lock
let mut transactions_in_flight = self.transactions_in_flight.write().unwrap();
transactions_in_flight.gc(epoch_ms_now);
}

// garbage collect the core mempool
self.core_mempool.gc();

self.last_gc = now;
}

Ok(())
}

Expand Down Expand Up @@ -270,7 +276,7 @@ impl TransactionPipe {
debug!("Adding transaction to mempool: {:?} {:?}", transaction, sequence_number);
let status = self.core_mempool.add_txn(
transaction.clone(),
0,
tx_result.score(),
sequence_number,
TimelineState::NonQualified,
true,
Expand Down Expand Up @@ -312,6 +318,18 @@ impl TransactionPipe {
// report status
Ok((status, None))
}

/// Sets the last gc time to a provided time
#[cfg(test)]
async fn set_last_gc(&mut self, last_gc: Instant) {
self.last_gc = last_gc;
}

/// Unsets the sequence number for a given account
#[cfg(test)]
fn unset_sequence_number(&mut self, account_address: &AccountAddress) {
self.used_sequence_number_pool.remove_sequence_number(account_address);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -614,4 +632,83 @@ mod tests {

Ok(())
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_gc_before_validating() -> Result<(), anyhow::Error> {
// create a transaction pipe
let (tx_sender, _tx_receiver) = mpsc::channel(16);
let (executor, _tempdir) = Executor::try_test_default(GENESIS_KEYPAIR.0.clone())?;
let (context, background) = executor.background(tx_sender)?;
let mut transaction_pipe = background.into_transaction_pipe();

// generate a transaction with sequence number 1, to which we will try to submit three times
let transaction = create_signed_transaction(1, &context.config().chain);

let start = Instant::now();
// send the transaction to the mempool, expect success
let submission_result = transaction_pipe.submit_transaction(transaction).await?;
match submission_result.0.code {
MempoolStatusCode::Accepted => {
info!("Transaction accepted");
}
_ => {
return Err(anyhow::anyhow!(
"Expected transaction to be accepted, but got {:?}",
submission_result.0.code
));
}
}

// send the transaction that uses the same sequence number, expect failure
let transaction = create_signed_transaction(1, &context.config().chain);
let submission_result = transaction_pipe.submit_transaction(transaction).await?;
match submission_result.0.code {
MempoolStatusCode::Accepted => {
return Err(anyhow::anyhow!(
"Expected transaction to be rejected, but got accepted"
));
}
MempoolStatusCode::UnknownStatus => {
return Err(anyhow::anyhow!(
"Expected transaction to be rejected, but got unknown status"
));
}
status => {
info!("Transaction rejected with status: {:?}", status);
}
}

// Test transactions and core mempool use the system clock, so we need to sleep
tokio::time::sleep(Duration::from_secs(30)).await;

// set the last gc time to a time greater than the expiration s.t. the transaction is garbage collected on the next submission
transaction_pipe.set_last_gc(start - (GC_INTERVAL * 2)).await;

// send the transaction to the mempool, expect success because the duplicate has been garbage collected
let transaction = create_signed_transaction(1, &context.config().chain);

// todo: gc needs to be rewritten for the sequence number mempool to respect transaction expiration, see #966
// for now, we will manually remove this
transaction_pipe.unset_sequence_number(&transaction.sender());

// todo: eventually we will want to update this test to check the background task pipe is running. However, because of limited instrumentation, we will reserve this for a future PR.
// Manually run the garbage collection
transaction_pipe.gc().await?;

/*let submission_result = transaction_pipe.submit_transaction(transaction).await?;
match submission_result.0.code {
MempoolStatusCode::Accepted => {
info!("Transaction accepted");
}
_ => {
return Err(anyhow::anyhow!(
"Expected transaction to be accepted, but got {:?}",
submission_result.0.code
));
}
}*/

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl Executor {

// replace the db path with the temporary directory
maptos_config.chain.maptos_db_path.replace(tempdir.path().to_path_buf());

let executor = Self::try_from_config(maptos_config)?;
Ok((executor, tempdir))
}
Expand Down
Loading