Skip to content

Commit

Permalink
txview: run status and age checks on incoming transactions (#4506)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Feb 13, 2025
1 parent 52cf690 commit 54da9b2
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
);

// Push unschedulables back into the queue
for id in self.unschedulables.drain(..) {
container.push_id_into_queue(id);
}
container.push_ids_into_queue(self.unschedulables.drain(..));

Ok(SchedulingSummary {
num_scheduled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,13 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
saturating_add_assign!(num_sent, self.send_batches(&mut batches)?);

// Push unschedulable ids back into the container
for id in unschedulable_ids {
container.push_id_into_queue(id);
}
container.push_ids_into_queue(unschedulable_ids.into_iter());

// Push remaining transactions back into the container
while let Some((id, _)) = self.prio_graph.pop_and_unblock() {
container.push_id_into_queue(id);
}
container.push_ids_into_queue(std::iter::from_fn(|| {
self.prio_graph.pop_and_unblock().map(|(id, _)| id)
}));

// No more remaining items in the queue.
// Clear here to make sure the next scheduling pass starts fresh
// without detecting any conflicts.
Expand Down
123 changes: 102 additions & 21 deletions core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use {
super::{
scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics},
transaction_priority_id::TransactionPriorityId,
transaction_state::TransactionState,
transaction_state_container::{
SharedBytes, StateContainer, TransactionViewState, TransactionViewStateContainer,
EXTRA_CAPACITY,
},
},
crate::banking_stage::{
Expand Down Expand Up @@ -406,8 +408,69 @@ impl TransactionViewReceiveAndBuffer {

let mut num_received = 0usize;
let mut num_buffered = 0usize;
let mut num_dropped_on_status_age_checks = 0usize;
let mut num_dropped_on_capacity = 0usize;
let mut num_dropped_on_receive = 0usize;

// Create temporary batches of transactions to be age-checked.
let mut transaction_priority_ids = ArrayVec::<_, EXTRA_CAPACITY>::new();
let lock_results: [_; EXTRA_CAPACITY] = core::array::from_fn(|_| Ok(()));
let mut error_counters = TransactionErrorMetrics::default();

let mut check_and_push_to_queue =
|container: &mut TransactionViewStateContainer,
transaction_priority_ids: &mut ArrayVec<TransactionPriorityId, 64>| {
// Temporary scope so that transaction references are immediately
// dropped and transactions not passing
let mut check_results = {
let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new();
transactions.extend(transaction_priority_ids.iter().map(|priority_id| {
&container
.get_transaction_ttl(priority_id.id)
.expect("transaction must exist")
.transaction
}));
working_bank.check_transactions::<RuntimeTransaction<_>>(
&transactions,
&lock_results[..transactions.len()],
MAX_PROCESSING_AGE,
&mut error_counters,
)
};

// Remove errored transactions
for (result, priority_id) in check_results
.iter_mut()
.zip(transaction_priority_ids.iter())
{
if result.is_err() {
num_dropped_on_status_age_checks += 1;
container.remove_by_id(priority_id.id);
}
let transaction = &container
.get_transaction_ttl(priority_id.id)
.expect("transaction must exist")
.transaction;
if let Err(err) = Consumer::check_fee_payer_unlocked(
working_bank,
transaction,
&mut error_counters,
) {
*result = Err(err);
num_dropped_on_status_age_checks += 1;
container.remove_by_id(priority_id.id);
}
}
// Push non-errored transaction into queue.
num_dropped_on_capacity += container.push_ids_into_queue(
check_results
.into_iter()
.zip(transaction_priority_ids.drain(..))
.filter(|(r, _)| r.is_ok())
.map(|(_, id)| id),
);
};

for packet_batch in packet_batch_message.iter() {
for packet in packet_batch.iter() {
let Some(packet_data) = packet.data(..) else {
Expand All @@ -417,38 +480,56 @@ impl TransactionViewReceiveAndBuffer {
num_received += 1;

// Reserve free-space to copy packet into, run sanitization checks, and insert.
if container.try_insert_with_data(
packet_data,
|bytes| match Self::try_handle_packet(
bytes,
root_bank,
working_bank,
alt_resolved_slot,
sanitized_epoch,
transaction_account_lock_limit,
) {
Ok(state) => {
num_buffered += 1;
Ok(state)
if let Some(transaction_id) =
container.try_insert_map_only_with_data(packet_data, |bytes| {
match Self::try_handle_packet(
bytes,
root_bank,
working_bank,
alt_resolved_slot,
sanitized_epoch,
transaction_account_lock_limit,
) {
Ok(state) => {
num_buffered += 1;
Ok(state)
}
Err(()) => {
num_dropped_on_receive += 1;
Err(())
}
}
Err(()) => {
num_dropped_on_receive += 1;
Err(())
}
},
) {
num_dropped_on_capacity += 1;
};
})
{
let priority = container
.get_mut_transaction_state(transaction_id)
.expect("transaction must exist")
.priority();
transaction_priority_ids
.push(TransactionPriorityId::new(priority, transaction_id));

// If at capacity, run checks and remove invalid transactions.
if transaction_priority_ids.len() == EXTRA_CAPACITY {
check_and_push_to_queue(container, &mut transaction_priority_ids);
}
}
}
}

// Any remaining packets undergo status/age checks
check_and_push_to_queue(container, &mut transaction_priority_ids);

let buffer_time_us = start.elapsed().as_micros() as u64;
timing_metrics.update(|timing_metrics| {
saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us);
});
count_metrics.update(|count_metrics| {
saturating_add_assign!(count_metrics.num_received, num_received);
saturating_add_assign!(count_metrics.num_buffered, num_buffered);
saturating_add_assign!(
count_metrics.num_dropped_on_age_and_status,
num_dropped_on_status_age_checks
);
saturating_add_assign!(
count_metrics.num_dropped_on_capacity,
num_dropped_on_capacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,8 @@ where
}

if hold {
for priority_id in ids_to_add_back {
self.container.push_id_into_queue(priority_id);
}
self.container
.push_ids_into_queue(ids_to_add_back.into_iter());
} else {
for priority_id in ids_to_add_back {
self.container.remove_by_id(priority_id.id);
Expand Down Expand Up @@ -393,14 +392,22 @@ where
&mut error_counters,
);

for (result, id) in check_results.into_iter().zip(chunk.iter()) {
// Remove errored transactions
for (result, id) in check_results.iter().zip(chunk.iter()) {
if result.is_err() {
saturating_add_assign!(num_dropped_on_age_and_status, 1);
self.container.remove_by_id(id.id);
} else {
self.container.push_id_into_queue(*id);
}
}

// Push non-errored transaction into queue.
self.container.push_ids_into_queue(
check_results
.into_iter()
.zip(chunk.iter())
.filter(|(r, _)| r.is_ok())
.map(|(_, id)| *id),
);
}

self.count_metrics.update(|count_metrics| {
Expand Down
Loading

0 comments on commit 54da9b2

Please sign in to comment.