Skip to content

Commit

Permalink
[scheduler] Enhancement: Add background coroutine group
Browse files Browse the repository at this point in the history
  • Loading branch information
iyzhang committed Jan 21, 2025
1 parent 0647582 commit d202a70
Show file tree
Hide file tree
Showing 35 changed files with 638 additions and 1,243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@

// Receive FIN segment.
+.1 TCP < F. seq 1(0) ack 1002 win 65535 <nop>

// Succeed to close connection immediately because we set linger to 0.
+0 wait(500, ...) = 0

// Send ACK on FIN segment.
+.0 TCP > . seq 1002(0) ack 2 win 65534 <nop>

// Succeed to close connection after 2 MLS.
+240 wait(500, ...) = 0
6 changes: 4 additions & 2 deletions network_simulator/input/tcp/close/close-local.pkt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

// Receive FIN segment.
+.1 TCP < F. seq 1(0) ack 2 win 65535 <nop>

// Succeed to close connection immediately because we have linger set to 0.
+0 wait(500, ...) = 0

// Send ACK on FIN segment.
+.0 TCP > . seq 2(0) ack 2 win 65534 <nop>

// Succeed to close connection after 2 MLS.
+240 wait(500, ...) = 0
3 changes: 3 additions & 0 deletions network_simulator/input/tcp/close/close-out-of-order-fin.pkt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
// Receive data packet
+.1 TCP < P. seq 1(1000) ack 1001 win 65535 <nop>

// Send finished.
+.0 wait(500, ...) = 0

// Send ACK packet for data and FIN.
+.0 TCP > . seq 1001(0) ack 1002 win 64534 <nop>

Expand Down
5 changes: 3 additions & 2 deletions network_simulator/input/tcp/close/close-simultaneous.pkt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
// Receive ACK on FIN segment.
+.1 TCP < F. seq 1(0) ack 2 win 65535 <nop>

// Succeed to close connection immediately because we have linger set to 0.
+0 wait(500, ...) = 0

// Send ACK on FIN segment.
+.0 TCP > . seq 2(0) ack 2 win 65534 <nop>

// Succeed to close connection after 2 MLS.
+240 wait(500, ...) = 0
5 changes: 3 additions & 2 deletions network_simulator/input/tcp/pop/pop-blocking.pkt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

// Receive data packet.
+.1 TCP < P. seq 1(1000) ack 1 win 65535 <nop>
// Send ACK packet.
+.6 TCP > . seq 1(0) ack 1001 win 65535 <nop>

// Data read.
+.0 wait(501, ...) = 0

// Send ACK packet.
+.6 TCP > . seq 1(0) ack 1001 win 65535 <nop>
5 changes: 3 additions & 2 deletions network_simulator/input/tcp/pop/pop-push-blocking.pkt
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

// Receive data packet.
+.1 TCP < P. seq 1(1000) ack 1 win 65535 <nop>
// Send ACK packet.
+.6 TCP > . seq 1(0) ack 1001 win 65535 <nop>

// Data read.
+.0 wait(501, ...) = 0

// Send ACK packet.
+.6 TCP > . seq 1(0) ack 1001 win 65535 <nop>

// Send data.
+.1 write(501, ..., 1000) = 1000

Expand Down
5 changes: 3 additions & 2 deletions network_simulator/input/tcp/push/push-pop-blocking.pkt
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@

// Receive data packet.
+.1 TCP < P. seq 1(1000) ack 1001 win 65535 <nop>
// Send ACK on data packet.
+.6 TCP > . seq 1001(0) ack 1001 win 64535 <nop>

// Data read.
+.0 wait(501, ...) = 0

// Send ACK on data packet.
+.6 TCP > . seq 1001(0) ack 1001 win 64535 <nop>
2 changes: 1 addition & 1 deletion src/rust/catnap/linux/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl SharedCatnapTransport {
options: TcpSocketOptions::new(config)?,
}));
let mut me2: Self = me.clone();
runtime.insert_background_coroutine(
runtime.insert_io_polling_coroutine(
"bgc::catnap::transport::epoll",
Box::pin(async move { me2.poll().await }.fuse()),
)?;
Expand Down
34 changes: 18 additions & 16 deletions src/rust/catnap/win/overlapped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,10 @@ mod tests {
})
.fuse();

let server_task: QToken = runtime.insert_io_coroutine("ioc_server", Box::pin(server)).unwrap();
ensure!(runtime.run_any(&[server_task], Duration::ZERO).is_none());
let server_task: QToken = runtime.insert_coroutine("ioc_server", None, Box::pin(server)).unwrap();
ensure!(runtime
.wait(server_task, Duration::ZERO)
.is_err_and(|e| e.errno == libc::ETIMEDOUT));
post_completion(&iocp, overlapped.as_mut().marshal(), COMPLETION_KEY)?;

iocp.process_events()?;
Expand All @@ -491,7 +493,7 @@ mod tests {
"completion key not updated"
);

ensure!(runtime.run_any(&[server_task], Duration::ZERO).is_some());
ensure!(runtime.wait(server_task, Duration::ZERO).is_ok());

Ok(())
}
Expand Down Expand Up @@ -585,17 +587,17 @@ mod tests {
);

let mut runtime: SharedDemiRuntime = SharedDemiRuntime::default();
let server_task: QToken = runtime.insert_io_coroutine("ioc_server", server).unwrap();
let server_task: QToken = runtime.insert_coroutine("ioc_server", None, server).unwrap();

let mut wait_for_state = |state| -> Result<(), Fail> {
while server_state_view.load(Ordering::Relaxed) < state {
iocp.get_mut().process_events()?;
if let Some(result) = runtime.run_any(&[server_task], Duration::ZERO) {
return match result {
(_, _, OperationResult::Failed(e)) => Err(e),
_ => Err(Fail::new(libc::EFAULT, "server completed early unexpectedly")),
};
}
match runtime.wait(server_task, Duration::ZERO) {
Err(e) if e.errno == libc::ETIMEDOUT => (),
Err(e) => return Err(e),
Ok((_, OperationResult::Failed(e))) => return Err(e),
_ => return Err(Fail::new(libc::EFAULT, "server completed early unexpectedly")),
};
}

Ok(())
Expand Down Expand Up @@ -631,8 +633,7 @@ mod tests {

let result: OperationResult = loop {
iocp.get_mut().process_events()?;
runtime.poll();
if let Some((_, result)) = runtime.get_completed_task(&server_task) {
if let Ok((_, result)) = runtime.wait(server_task, Duration::ZERO) {
break result;
}
};
Expand Down Expand Up @@ -691,7 +692,7 @@ mod tests {
.fuse();

let mut runtime: SharedDemiRuntime = SharedDemiRuntime::default();
let server_task: QToken = runtime.insert_io_coroutine("ioc_server", Box::pin(server)).unwrap();
let server_task: QToken = runtime.insert_coroutine("ioc_server", None, Box::pin(server)).unwrap();

ensure!(
server_state_view.load(Ordering::Relaxed) < 1,
Expand All @@ -701,7 +702,9 @@ mod tests {
let iocp_ref: &mut IoCompletionPort<()> = unsafe { &mut *iocp.get() };
iocp_ref.process_events()?;
ensure!(
runtime.run_any(&[server_task], Duration::ZERO).is_none(),
runtime
.wait(server_task, Duration::ZERO)
.is_err_and(|e| e.errno == libc::ETIMEDOUT),
"server should not be done"
);

Expand All @@ -710,8 +713,7 @@ mod tests {
// Move time forward, which should time out the operation.
runtime.advance_clock(Instant::now());
iocp.get_mut().process_events()?;
if let Some((i, _, result)) = runtime.run_any(&[server_task], Duration::ZERO) {
ensure_eq!(i, 0);
if let Ok((_, result)) = runtime.wait(server_task, Duration::ZERO) {
break result;
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/rust/catnap/win/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl SharedCatnapTransport {
runtime: runtime.clone(),
}));

runtime.insert_background_coroutine(
runtime.insert_io_polling_coroutine(
"bgc::catnap::transport::epoll",
Box::pin({
let mut me: Self = me.clone();
Expand Down
1 change: 1 addition & 0 deletions src/rust/collections/id_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const MAX_RETRIES_ID_ALLOC: usize = 500;
/// This data structure is a general-purpose map for obfuscating ids from external modules. It takes an external id type
/// and an internal id type and translates between the two. The ID types must be basic types that can be converted back
/// and forth between u64 and therefore each other.
#[derive(Debug)]
pub struct IdMap<E: Eq + Hash + From<u64> + Into<u64> + Copy, I: From<u64> + Into<u64> + Copy> {
/// Map between external and internal ids.
ids: HashMap<E, I>,
Expand Down
33 changes: 0 additions & 33 deletions src/rust/demikernel/libos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -144,8 +142,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -157,8 +153,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -169,8 +163,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -183,8 +175,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -198,8 +188,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -211,8 +199,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -224,8 +210,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -243,8 +227,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -255,8 +237,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -268,8 +248,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -282,8 +260,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand All @@ -305,8 +281,6 @@ impl LibOS {
}
};

self.poll();

result
}

Expand Down Expand Up @@ -363,11 +337,4 @@ impl LibOS {

result
}

pub fn poll(&mut self) {
// No profiling scope here because we may enter a coroutine scope.
match self {
LibOS::NetworkLibOS(libos) => libos.poll(),
}
}
}
Loading

0 comments on commit d202a70

Please sign in to comment.