Skip to content

Commit

Permalink
Merge common impls of submit_one_event into submit_event_chain (#551
Browse files Browse the repository at this point in the history
)

Co-authored-by: Glauber Costa <[email protected]>
  • Loading branch information
vmingchen and Glauber Costa authored Apr 25, 2024
1 parent eea418d commit 937b610
Showing 1 changed file with 54 additions and 54 deletions.
108 changes: 54 additions & 54 deletions glommio/src/sys/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use std::{
use crate::{
free_list::{FreeList, Idx},
iou,
iou::sqe::{FsyncFlags, SockAddrStorage, StatxFlags, StatxMode, SubmissionFlags, TimeoutFlags},
iou::{
sqe::{FsyncFlags, SockAddrStorage, StatxFlags, StatxMode, SubmissionFlags, TimeoutFlags},
IoUring,
},
sys::{
self,
blocking::{BlockingThreadOp, BlockingThreadPool},
Expand Down Expand Up @@ -480,7 +483,7 @@ fn record_stats<Ring: UringCommon>(
}
}

// Find the next complete chain of events from the queue
// Find the next complete chain of events from the queue.
// Returns None if the queue is empty.
fn peek_one_chain(queue: &VecDeque<UringDescriptor>, ring_size: usize) -> Option<Range<usize>> {
if queue.is_empty() {
Expand Down Expand Up @@ -532,6 +535,41 @@ fn extract_one_chain(
.collect()
}

// Submit the next complete chain of events from the queue if available.
fn submit_event_chain(
source_map: &mut SourceMap,
ring: &mut IoUring,
allocator: Rc<UringBufferAllocator>,
queue: &mut VecDeque<UringDescriptor>,
ring_size: usize,
) -> Option<bool> {
let now = Instant::now();

while let Some(chain) = peek_one_chain(queue, ring_size) {
return if let Some(sqes) = ring.sq().prepare_sqes(chain.len() as u32) {
let ops = extract_one_chain(source_map, queue, chain, now);
if ops.is_empty() {
// all the sources in the ring were cancelled
continue;
}

for (op, mut sqe) in ops.into_iter().zip(sqes.into_iter()) {
let allocator = allocator.clone();
fill_sqe(
&mut sqe,
&op,
move |size| allocator.new_buffer(size),
source_map,
);
}
Some(true)
} else {
None
};
}
Some(false)
}

fn process_one_event<F, R>(
cqe: Option<iou::CQE>,
try_process: F,
Expand Down Expand Up @@ -854,32 +892,13 @@ impl UringCommon for PollRing {
}

fn submit_one_event(&mut self, queue: &mut VecDeque<UringDescriptor>) -> Option<bool> {
let source_map = &mut *self.source_map.borrow_mut();
let now = Instant::now();

while let Some(chain) = peek_one_chain(queue, self.size) {
return if let Some(sqes) = self.ring.sq().prepare_sqes(chain.len() as u32) {
let ops = extract_one_chain(source_map, queue, chain, now);
if ops.is_empty() {
// all the sources in the ring were cancelled
continue;
}

for (op, mut sqe) in ops.into_iter().zip(sqes.into_iter()) {
let allocator = self.allocator.clone();
fill_sqe(
&mut sqe,
&op,
move |size| allocator.new_buffer(size),
source_map,
);
}
Some(true)
} else {
None
};
}
Some(false)
submit_event_chain(
&mut *self.source_map.borrow_mut(),
&mut self.ring,
self.allocator.clone(),
queue,
self.size,
)
}
}

Expand Down Expand Up @@ -1186,32 +1205,13 @@ impl UringCommon for SleepableRing {
}

fn submit_one_event(&mut self, queue: &mut VecDeque<UringDescriptor>) -> Option<bool> {
let source_map = &mut *self.source_map.borrow_mut();
let now = Instant::now();

while let Some(chain) = peek_one_chain(queue, self.size) {
return if let Some(sqes) = self.ring.sq().prepare_sqes(chain.len() as u32) {
let ops = extract_one_chain(source_map, queue, chain, now);
if ops.is_empty() {
// all the sources in the ring were cancelled
continue;
}

for (op, mut sqe) in ops.into_iter().zip(sqes.into_iter()) {
let allocator = self.allocator.clone();
fill_sqe(
&mut sqe,
&op,
move |size| allocator.new_buffer(size),
source_map,
);
}
Some(true)
} else {
None
};
}
Some(false)
submit_event_chain(
&mut *self.source_map.borrow_mut(),
&mut self.ring,
self.allocator.clone(),
queue,
self.size,
)
}
}

Expand Down

0 comments on commit 937b610

Please sign in to comment.