diff --git a/ibverbs/src/lib.rs b/ibverbs/src/lib.rs index 02d4908..07c4fc8 100644 --- a/ibverbs/src/lib.rs +++ b/ibverbs/src/lib.rs @@ -549,15 +549,24 @@ pub struct QueuePairBuilder<'res> { qp_type: ffi::ibv_qp_type::Type, // carried along to handshake phase - access: ffi::ibv_access_flags, - timeout: u8, - retry_count: u8, - rnr_retry: u8, - min_rnr_timer: u8, - max_rd_atomic: u8, - max_dest_rd_atomic: u8, - path_mtu: u32, - rq_psn: u32, + /// only valid for RC and UC + access: Option, + /// only valid for RC + timeout: Option, + /// only valid for RC + retry_count: Option, + /// only valid for RC + rnr_retry: Option, + /// only valid for RC + min_rnr_timer: Option, + /// only valid for RC + max_rd_atomic: Option, + /// only valid for RC + max_dest_rd_atomic: Option, + /// only valid for RC and UC + path_mtu: Option, + /// only valid for RC and UC + rq_psn: Option, } impl<'res> QueuePairBuilder<'res> { @@ -601,38 +610,60 @@ impl<'res> QueuePairBuilder<'res> { qp_type, - access: ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE, - min_rnr_timer: 16, - retry_count: 6, - rnr_retry: 6, - timeout: 4, - max_rd_atomic: 1, - max_dest_rd_atomic: 1, - path_mtu: pd.ctx.port_attr.active_mtu, - rq_psn: 0, + access: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC + || qp_type == ffi::ibv_qp_type::IBV_QPT_UC) + .then_some(ffi::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE), + min_rnr_timer: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(16), + retry_count: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(6), + rnr_retry: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(6), + timeout: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(4), + max_rd_atomic: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(1), + max_dest_rd_atomic: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC).then_some(1), + path_mtu: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC + || qp_type == ffi::ibv_qp_type::IBV_QPT_UC) + .then_some(pd.ctx.port_attr.active_mtu), + rq_psn: (qp_type == ffi::ibv_qp_type::IBV_QPT_RC + || qp_type == ffi::ibv_qp_type::IBV_QPT_UC) + .then_some(0), } } /// Set the access flags for the new `QueuePair`. /// + /// Valid only for RC and UC QPs. + /// /// Defaults to `IBV_ACCESS_LOCAL_WRITE`. pub fn set_access(&mut self, access: ffi::ibv_access_flags) -> &mut Self { - self.access = access; + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC + && self.qp_type != ffi::ibv_qp_type::IBV_QPT_UC + { + panic!("Setting the access flags is only possible on RC and UC Queue Pairs."); + } + self.access = Some(access); self } /// Set the access flags of the new `QueuePair` such that it allows remote reads and writes. + /// + /// Valid only for RC and UC QPs. pub fn allow_remote_rw(&mut self) -> &mut Self { - self.access = self.access - | ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE - | ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ; + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC + && self.qp_type != ffi::ibv_qp_type::IBV_QPT_UC + { + panic!("Setting the access flags is only possible on RC and UC Queue Pairs."); + } + self.access = Some( + self.access.unwrap() + | ffi::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE + | ffi::ibv_access_flags::IBV_ACCESS_REMOTE_READ, + ); self } /// Sets the minimum RNR NAK Timer Field Value for the new `QueuePair`. /// /// Defaults to 16 (2.56 ms delay). - /// Relevant only for RC QPs. + /// Valid only for RC QPs. /// /// When an incoming message to this QP should consume a Work Request from the Receive Queue, /// but no Work Request is outstanding on that Queue, the QP will send an RNR NAK packet to @@ -672,7 +703,10 @@ impl<'res> QueuePairBuilder<'res> { /// - 30 - 327.68 ms delay /// - 31 - 491.52 ms delay pub fn set_min_rnr_timer(&mut self, timer: u8) -> &mut Self { - self.min_rnr_timer = timer; + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC { + panic!("Setting the RNR timer value is only possible on RC Queue Pairs."); + } + self.min_rnr_timer = Some(timer); self } @@ -680,7 +714,7 @@ impl<'res> QueuePairBuilder<'res> { /// retransmitting the packet. /// /// Defaults to 4 (65.536µs). - /// Relevant only to RC QPs. + /// Valid only for RC QPs. /// /// The value zero is special value that waits an infinite time for the ACK/NACK (useful /// for debugging). This means that if any packet in a message is being lost and no ACK or NACK @@ -721,7 +755,10 @@ impl<'res> QueuePairBuilder<'res> { /// - 30 - 4400 s /// - 31 - 8800 s pub fn set_timeout(&mut self, timeout: u8) -> &mut Self { - self.timeout = timeout; + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC { + panic!("Setting the timeout is only possible on RC Queue Pairs."); + } + self.timeout = Some(timeout); self } @@ -729,13 +766,17 @@ impl<'res> QueuePairBuilder<'res> { /// before reporting an error because the remote side doesn't answer in the primary path. /// /// This 3 bit value defaults to 6. + /// Valid only for RC QPs. /// /// # Panics /// /// Panics if a count higher than 7 is given. pub fn set_retry_count(&mut self, count: u8) -> &mut Self { + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC { + panic!("Setting the retry count is only possible on RC Queue Pairs."); + } assert!(count <= 7); - self.retry_count = count; + self.retry_count = Some(count); self } @@ -744,21 +785,29 @@ impl<'res> QueuePairBuilder<'res> { /// /// This 3 bit value defaults to 6. The value 7 is special and specify to retry sending the /// message indefinitely when a RNR Nack is being sent by remote side. + /// Valid only for RC QPs. /// /// # Panics /// /// Panics if a limit higher than 7 is given. pub fn set_rnr_retry(&mut self, n: u8) -> &mut Self { + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC { + panic!("Setting the RNR retry count is only possible on RC Queue Pairs."); + } assert!(n <= 7); - self.rnr_retry = n; + self.rnr_retry = Some(n); self } /// Set the number of outstanding RDMA reads & atomic operations on the destination Queue Pair. /// /// This defaults to 1. + /// Valid only for RC QPs. pub fn set_max_rd_atomic(&mut self, max_rd_atomic: u8) -> &mut Self { - self.max_rd_atomic = max_rd_atomic; + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC { + panic!("Setting the number of outstanding RDMA reads & atomic operations is only possible on RC Queue Pairs."); + } + self.max_rd_atomic = Some(max_rd_atomic); self } @@ -766,7 +815,10 @@ impl<'res> QueuePairBuilder<'res> { /// /// This defaults to 1. pub fn set_max_dest_rd_atomic(&mut self, max_dest_rd_atomic: u8) -> &mut Self { - self.max_dest_rd_atomic = max_dest_rd_atomic; + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC { + panic!("Setting the responder resources for handling incoming RDMA reads & atomic operations is only possible on RC Queue Pairs."); + } + self.max_dest_rd_atomic = Some(max_dest_rd_atomic); self } @@ -780,8 +832,13 @@ impl<'res> QueuePairBuilder<'res> { /// - 4: 2048 /// - 5: 4096 pub fn set_path_mtu(&mut self, path_mtu: u32) -> &mut Self { + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC + && self.qp_type != ffi::ibv_qp_type::IBV_QPT_UC + { + panic!("Setting the path MTU is only possible on RC and UC Queue Pairs."); + } assert!((1..=5).contains(&path_mtu)); - self.path_mtu = path_mtu; + self.path_mtu = Some(path_mtu); self } @@ -789,7 +846,12 @@ impl<'res> QueuePairBuilder<'res> { /// /// Defaults to 0. pub fn set_rq_psn(&mut self, rq_psn: u32) -> &mut Self { - self.rq_psn = rq_psn; + if self.qp_type != ffi::ibv_qp_type::IBV_QPT_RC + && self.qp_type != ffi::ibv_qp_type::IBV_QPT_UC + { + panic!("Setting the receive queue's PSN is only possible on RC and UC Queue Pairs."); + } + self.rq_psn = Some(rq_psn); self } @@ -885,15 +947,24 @@ pub struct PreparedQueuePair<'res> { qp: QueuePair<'res>, // carried from builder - access: ffi::ibv_access_flags, - min_rnr_timer: u8, - timeout: u8, - retry_count: u8, - rnr_retry: u8, - max_rd_atomic: u8, - max_dest_rd_atomic: u8, - path_mtu: u32, - rq_psn: u32, + /// only valid for RC and UC + access: Option, + /// only valid for RC + min_rnr_timer: Option, + /// only valid for RC + timeout: Option, + /// only valid for RC + retry_count: Option, + /// only valid for RC + rnr_retry: Option, + /// only valid for RC + max_rd_atomic: Option, + /// only valid for RC + max_dest_rd_atomic: Option, + /// only valid for RC and UC + path_mtu: Option, + /// only valid for RC and UC + rq_psn: Option, } /// A Global identifier for ibv. @@ -1029,15 +1100,17 @@ impl<'res> PreparedQueuePair<'res> { // init and associate with port let mut attr = ffi::ibv_qp_attr { qp_state: ffi::ibv_qp_state::IBV_QPS_INIT, - qp_access_flags: self.access.0, pkey_index: 0, port_num: PORT_NUM, ..Default::default() }; - let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE + let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE | ffi::ibv_qp_attr_mask::IBV_QP_PKEY_INDEX - | ffi::ibv_qp_attr_mask::IBV_QP_PORT - | ffi::ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS; + | ffi::ibv_qp_attr_mask::IBV_QP_PORT; + if let Some(access) = self.access { + attr.qp_access_flags = access.0; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_ACCESS_FLAGS; + } let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) }; if errno != 0 { return Err(io::Error::from_raw_os_error(errno)); @@ -1046,11 +1119,9 @@ impl<'res> PreparedQueuePair<'res> { // set ready to receive let mut attr = ffi::ibv_qp_attr { qp_state: ffi::ibv_qp_state::IBV_QPS_RTR, - path_mtu: self.path_mtu, + // TODO: this is only valid for RC and UC dest_qp_num: remote.num, - rq_psn: self.rq_psn, - max_dest_rd_atomic: self.max_dest_rd_atomic, - min_rnr_timer: self.min_rnr_timer, + // TODO: this is only valid for RC and UC ah_attr: ffi::ibv_ah_attr { dlid: remote.lid, sl: 0, @@ -1066,13 +1137,25 @@ impl<'res> PreparedQueuePair<'res> { attr.ah_attr.grh.dgid = gid.into(); attr.ah_attr.grh.hop_limit = 0xff; } - let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE + let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE | ffi::ibv_qp_attr_mask::IBV_QP_AV - | ffi::ibv_qp_attr_mask::IBV_QP_PATH_MTU - | ffi::ibv_qp_attr_mask::IBV_QP_DEST_QPN - | ffi::ibv_qp_attr_mask::IBV_QP_RQ_PSN - | ffi::ibv_qp_attr_mask::IBV_QP_MAX_DEST_RD_ATOMIC - | ffi::ibv_qp_attr_mask::IBV_QP_MIN_RNR_TIMER; + | ffi::ibv_qp_attr_mask::IBV_QP_DEST_QPN; + if let Some(max_dest_rd_atomic) = self.max_dest_rd_atomic { + attr.max_dest_rd_atomic = max_dest_rd_atomic; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_MAX_DEST_RD_ATOMIC; + } + if let Some(min_rnr_timer) = self.min_rnr_timer { + attr.min_rnr_timer = min_rnr_timer; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_MIN_RNR_TIMER; + } + if let Some(path_mtu) = self.path_mtu { + attr.path_mtu = path_mtu; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_PATH_MTU; + } + if let Some(rq_psn) = self.rq_psn { + attr.rq_psn = rq_psn; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_RQ_PSN; + } let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) }; if errno != 0 { return Err(io::Error::from_raw_os_error(errno)); @@ -1081,19 +1164,26 @@ impl<'res> PreparedQueuePair<'res> { // set ready to send let mut attr = ffi::ibv_qp_attr { qp_state: ffi::ibv_qp_state::IBV_QPS_RTS, - timeout: self.timeout, - retry_cnt: self.retry_count, sq_psn: 0, - rnr_retry: self.rnr_retry, - max_rd_atomic: self.max_rd_atomic, ..Default::default() }; - let mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE - | ffi::ibv_qp_attr_mask::IBV_QP_TIMEOUT - | ffi::ibv_qp_attr_mask::IBV_QP_RETRY_CNT - | ffi::ibv_qp_attr_mask::IBV_QP_SQ_PSN - | ffi::ibv_qp_attr_mask::IBV_QP_RNR_RETRY - | ffi::ibv_qp_attr_mask::IBV_QP_MAX_QP_RD_ATOMIC; + let mut mask = ffi::ibv_qp_attr_mask::IBV_QP_STATE | ffi::ibv_qp_attr_mask::IBV_QP_SQ_PSN; + if let Some(timeout) = self.timeout { + attr.timeout = timeout; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_TIMEOUT; + } + if let Some(retry_count) = self.retry_count { + attr.retry_cnt = retry_count; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_RETRY_CNT; + } + if let Some(rnr_retry) = self.rnr_retry { + attr.rnr_retry = rnr_retry; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_RNR_RETRY; + } + if let Some(max_rd_atomic) = self.max_rd_atomic { + attr.max_rd_atomic = max_rd_atomic; + mask |= ffi::ibv_qp_attr_mask::IBV_QP_MAX_QP_RD_ATOMIC; + } let errno = unsafe { ffi::ibv_modify_qp(self.qp.qp, &mut attr as *mut _, mask.0 as i32) }; if errno != 0 { return Err(io::Error::from_raw_os_error(errno));