Skip to content

Commit

Permalink
Set attributes only on queue pairs of the correct type
Browse files Browse the repository at this point in the history
  • Loading branch information
YtvwlD committed Jul 27, 2024
1 parent 608ae95 commit fbdca10
Showing 1 changed file with 155 additions and 65 deletions.
220 changes: 155 additions & 65 deletions ibverbs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,15 +549,24 @@ pub struct QueuePairBuilder<'res> {
qp_type: ffi::ibv_qp_type::Type,

// carried along to handshake phase
access: ffi::ibv_access_flags,
timeout: u8,
retry_count: u8,
rnr_retry: u8,
min_rnr_timer: u8,
max_rd_atomic: u8,
max_dest_rd_atomic: u8,
path_mtu: u32,
rq_psn: u32,
/// only valid for RC and UC
access: Option<ffi::ibv_access_flags>,
/// only valid for RC
timeout: Option<u8>,
/// only valid for RC
retry_count: Option<u8>,
/// only valid for RC
rnr_retry: Option<u8>,
/// only valid for RC
min_rnr_timer: Option<u8>,
/// only valid for RC
max_rd_atomic: Option<u8>,
/// only valid for RC
max_dest_rd_atomic: Option<u8>,
/// only valid for RC and UC
path_mtu: Option<u32>,
/// only valid for RC and UC
rq_psn: Option<u32>,
}

impl<'res> QueuePairBuilder<'res> {
Expand Down Expand Up @@ -601,38 +610,60 @@ impl<'res> QueuePairBuilder<'res> {

qp_type,

access: ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE,
min_rnr_timer: 16,
retry_count: 6,
rnr_retry: 6,
timeout: 4,
max_rd_atomic: 1,
max_dest_rd_atomic: 1,
path_mtu: pd.ctx.port_attr.active_mtu,
rq_psn: 0,
access: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| qp_type == ffi::ibv_qp_type::IBV_QPT_UC)
.then_some(ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE),
min_rnr_timer: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(16),
retry_count: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(6),
rnr_retry: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(6),
timeout: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(4),
max_rd_atomic: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(1),
max_dest_rd_atomic: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(1),
path_mtu: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| qp_type == ffi::ibv_qp_type::IBV_QPT_UC)
.then_some(pd.ctx.port_attr.active_mtu),
rq_psn: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC
|| qp_type == ffi::ibv_qp_type::IBV_QPT_UC)
.then_some(0),
}
}

/// Set the access flags for the new `QueuePair`.
///
/// Valid only for RC and UC QPs.
///
/// Defaults to `IBV_ACCESS_LOCAL_WRITE`.
pub fn set_access(&mut self, access: ffi::ibv_access_flags) -> &mut Self {
self.access = access;
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC
&& self.qp_type != ffi::ibv_qp_type::IBV_QPT_UC
{
panic!("Setting the access flags is only possible on RC and UC Queue Pairs.");
}
self.access = Some(access);
self
}

/// Set the access flags of the new `QueuePair` such that it allows remote reads and writes.
///
/// Valid only for RC and UC QPs.
pub fn allow_remote_rw(&mut self) -> &mut Self {
self.access = self.access
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ;
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC
&& self.qp_type != ffi::ibv_qp_type::IBV_QPT_UC
{
panic!("Setting the access flags is only possible on RC and UC Queue Pairs.");
}
self.access = Some(
self.access.unwrap()
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
| ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ,
);
self
}

/// Sets the minimum RNR NAK Timer Field Value for the new `QueuePair`.
///
/// Defaults to 16 (2.56 ms delay).
/// Relevant only for RC QPs.
/// Valid only for RC QPs.
///
/// When an incoming message to this QP should consume a Work Request from the Receive Queue,
/// but no Work Request is outstanding on that Queue, the QP will send an RNR NAK packet to
Expand Down Expand Up @@ -672,15 +703,18 @@ impl<'res> QueuePairBuilder<'res> {
/// - 30 - 327.68 ms delay
/// - 31 - 491.52 ms delay
pub fn set_min_rnr_timer(&mut self, timer: u8) -> &mut Self {
self.min_rnr_timer = timer;
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC {
panic!("Setting the RNR timer value is only possible on RC Queue Pairs.");
}
self.min_rnr_timer = Some(timer);
self
}

/// Sets the minimum timeout that the new `QueuePair` waits for ACK/NACK from remote QP before
/// retransmitting the packet.
///
/// Defaults to 4 (65.536µs).
/// Relevant only to RC QPs.
/// Valid only for RC QPs.
///
/// The value zero is special value that waits an infinite time for the ACK/NACK (useful
/// for debugging). This means that if any packet in a message is being lost and no ACK or NACK
Expand Down Expand Up @@ -721,21 +755,28 @@ impl<'res> QueuePairBuilder<'res> {
/// - 30 - 4400 s
/// - 31 - 8800 s
pub fn set_timeout(&mut self, timeout: u8) -> &mut Self {
self.timeout = timeout;
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC {
panic!("Setting the timeout is only possible on RC Queue Pairs.");
}
self.timeout = Some(timeout);
self
}

/// Sets the total number of times that the new `QueuePair` will try to resend the packets
/// before reporting an error because the remote side doesn't answer in the primary path.
///
/// This 3 bit value defaults to 6.
/// Valid only for RC QPs.
///
/// # Panics
///
/// Panics if a count higher than 7 is given.
pub fn set_retry_count(&mut self, count: u8) -> &mut Self {
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC {
panic!("Setting the retry count is only possible on RC Queue Pairs.");
}
assert!(count <= 7);
self.retry_count = count;
self.retry_count = Some(count);
self
}

Expand All @@ -744,29 +785,40 @@ impl<'res> QueuePairBuilder<'res> {
///
/// This 3 bit value defaults to 6. The value 7 is special and specify to retry sending the
/// message indefinitely when a RNR Nack is being sent by remote side.
/// Valid only for RC QPs.
///
/// # Panics
///
/// Panics if a limit higher than 7 is given.
pub fn set_rnr_retry(&mut self, n: u8) -> &mut Self {
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC {
panic!("Setting the RNR retry count is only possible on RC Queue Pairs.");
}
assert!(n <= 7);
self.rnr_retry = n;
self.rnr_retry = Some(n);
self
}

/// Set the number of outstanding RDMA reads & atomic operations on the destination Queue Pair.
///
/// This defaults to 1.
/// Valid only for RC QPs.
pub fn set_max_rd_atomic(&mut self, max_rd_atomic: u8) -> &mut Self {
self.max_rd_atomic = max_rd_atomic;
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC {
panic!("Setting the number of outstanding RDMA reads & atomic operations is only possible on RC Queue Pairs.");
}
self.max_rd_atomic = Some(max_rd_atomic);
self
}

/// Set the number of responder resources for handling incoming RDMA reads & atomic operations.
///
/// This defaults to 1.
pub fn set_max_dest_rd_atomic(&mut self, max_dest_rd_atomic: u8) -> &mut Self {
self.max_dest_rd_atomic = max_dest_rd_atomic;
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC {
panic!("Setting the responder resources for handling incoming RDMA reads & atomic operations is only possible on RC Queue Pairs.");
}
self.max_dest_rd_atomic = Some(max_dest_rd_atomic);
self
}

Expand All @@ -780,16 +832,26 @@ impl<'res> QueuePairBuilder<'res> {
/// - 4: 2048
/// - 5: 4096
pub fn set_path_mtu(&mut self, path_mtu: u32) -> &mut Self {
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC
&& self.qp_type != ffi::ibv_qp_type::IBV_QPT_UC
{
panic!("Setting the path MTU is only possible on RC and UC Queue Pairs.");
}
assert!((1..=5).contains(&path_mtu));
self.path_mtu = path_mtu;
self.path_mtu = Some(path_mtu);
self
}

/// Set the PSN for the receive queue.
///
/// Defaults to 0.
pub fn set_rq_psn(&mut self, rq_psn: u32) -> &mut Self {
self.rq_psn = rq_psn;
if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC
&& self.qp_type != ffi::ibv_qp_type::IBV_QPT_UC
{
panic!("Setting the receive queue's PSN is only possible on RC and UC Queue Pairs.");
}
self.rq_psn = Some(rq_psn);
self
}

Expand Down Expand Up @@ -885,15 +947,24 @@ pub struct PreparedQueuePair<'res> {
qp: QueuePair<'res>,

// carried from builder
access: ffi::ibv_access_flags,
min_rnr_timer: u8,
timeout: u8,
retry_count: u8,
rnr_retry: u8,
max_rd_atomic: u8,
max_dest_rd_atomic: u8,
path_mtu: u32,
rq_psn: u32,
/// only valid for RC and UC
access: Option<ffi::ibv_access_flags>,
/// only valid for RC
min_rnr_timer: Option<u8>,
/// only valid for RC
timeout: Option<u8>,
/// only valid for RC
retry_count: Option<u8>,
/// only valid for RC
rnr_retry: Option<u8>,
/// only valid for RC
max_rd_atomic: Option<u8>,
/// only valid for RC
max_dest_rd_atomic: Option<u8>,
/// only valid for RC and UC
path_mtu: Option<u32>,
/// only valid for RC and UC
rq_psn: Option<u32>,
}

/// A Global identifier for ibv.
Expand Down Expand Up @@ -1029,15 +1100,17 @@ impl<'res> PreparedQueuePair<'res> {
// init and associate with port
let mut attr = ffi::ibv_qp_attr {
qp_state: ffi::ibv_qp_state::IBV_QPS_INIT,
qp_access_flags: self.access.0,
pkey_index: 0,
port_num: PORT_NUM,
..Default::default()
};
let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
| ffi::ibv_qp_attr_mask::IBV_QP_PKEY_INDEX
| ffi::ibv_qp_attr_mask::IBV_QP_PORT
| ffi::ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS;
| ffi::ibv_qp_attr_mask::IBV_QP_PORT;
if let Some(access) = self.access {
attr.qp_access_flags = access.0;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS;
}
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
Expand All @@ -1046,11 +1119,9 @@ impl<'res> PreparedQueuePair<'res> {
// set ready to receive
let mut attr = ffi::ibv_qp_attr {
qp_state: ffi::ibv_qp_state::IBV_QPS_RTR,
path_mtu: self.path_mtu,
// TODO: this is only valid for RC and UC
dest_qp_num: remote.num,
rq_psn: self.rq_psn,
max_dest_rd_atomic: self.max_dest_rd_atomic,
min_rnr_timer: self.min_rnr_timer,
// TODO: this is only valid for RC and UC
ah_attr: ffi::ibv_ah_attr {
dlid: remote.lid,
sl: 0,
Expand All @@ -1066,13 +1137,25 @@ impl<'res> PreparedQueuePair<'res> {
attr.ah_attr.grh.dgid = gid.into();
attr.ah_attr.grh.hop_limit = 0xff;
}
let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
| ffi::ibv_qp_attr_mask::IBV_QP_AV
| ffi::ibv_qp_attr_mask::IBV_QP_PATH_MTU
| ffi::ibv_qp_attr_mask::IBV_QP_DEST_QPN
| ffi::ibv_qp_attr_mask::IBV_QP_RQ_PSN
| ffi::ibv_qp_attr_mask::IBV_QP_MAX_DEST_RD_ATOMIC
| ffi::ibv_qp_attr_mask::IBV_QP_MIN_RNR_TIMER;
| ffi::ibv_qp_attr_mask::IBV_QP_DEST_QPN;
if let Some(max_dest_rd_atomic) = self.max_dest_rd_atomic {
attr.max_dest_rd_atomic = max_dest_rd_atomic;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_MAX_DEST_RD_ATOMIC;
}
if let Some(min_rnr_timer) = self.min_rnr_timer {
attr.min_rnr_timer = min_rnr_timer;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_MIN_RNR_TIMER;
}
if let Some(path_mtu) = self.path_mtu {
attr.path_mtu = path_mtu;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_PATH_MTU;
}
if let Some(rq_psn) = self.rq_psn {
attr.rq_psn = rq_psn;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_RQ_PSN;
}
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
Expand All @@ -1081,19 +1164,26 @@ impl<'res> PreparedQueuePair<'res> {
// set ready to send
let mut attr = ffi::ibv_qp_attr {
qp_state: ffi::ibv_qp_state::IBV_QPS_RTS,
timeout: self.timeout,
retry_cnt: self.retry_count,
sq_psn: 0,
rnr_retry: self.rnr_retry,
max_rd_atomic: self.max_rd_atomic,
..Default::default()
};
let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE
| ffi::ibv_qp_attr_mask::IBV_QP_TIMEOUT
| ffi::ibv_qp_attr_mask::IBV_QP_RETRY_CNT
| ffi::ibv_qp_attr_mask::IBV_QP_SQ_PSN
| ffi::ibv_qp_attr_mask::IBV_QP_RNR_RETRY
| ffi::ibv_qp_attr_mask::IBV_QP_MAX_QP_RD_ATOMIC;
let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE | ffi::ibv_qp_attr_mask::IBV_QP_SQ_PSN;
if let Some(timeout) = self.timeout {
attr.timeout = timeout;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_TIMEOUT;
}
if let Some(retry_count) = self.retry_count {
attr.retry_cnt = retry_count;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_RETRY_CNT;
}
if let Some(rnr_retry) = self.rnr_retry {
attr.rnr_retry = rnr_retry;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_RNR_RETRY;
}
if let Some(max_rd_atomic) = self.max_rd_atomic {
attr.max_rd_atomic = max_rd_atomic;
mask |= ffi::ibv_qp_attr_mask::IBV_QP_MAX_QP_RD_ATOMIC;
}
let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) };
if errno != 0 {
return Err(io::Error::from_raw_os_error(errno));
Expand Down

0 comments on commit fbdca10

Please sign in to comment.