diff --git a/src/connmgr/batch.rs b/src/connmgr/batch.rs index e28546d0..ccc9a273 100644 --- a/src/connmgr/batch.rs +++ b/src/connmgr/batch.rs @@ -30,6 +30,7 @@ pub struct BatchKey { pub struct BatchGroup<'a, 'b> { addr: &'b [u8], + use_router: bool, ids: arena::ReusableVecHandle<'b, zhttppacket::Id<'a>>, } @@ -38,14 +39,25 @@ impl<'a> BatchGroup<'a, '_> { self.addr } + #[allow(dead_code)] + pub fn use_router(&self) -> bool { + self.use_router + } + pub fn ids(&self) -> &[zhttppacket::Id<'a>] { &self.ids } } +struct AddrItem { + addr: ArrayVec, + use_router: bool, + keys: list::List, +} + pub struct Batch { nodes: Slab>, - addrs: Vec<(ArrayVec, list::List)>, + addrs: Vec, addr_index: usize, group_ids: arena::ReusableVec, last_group_ckeys: Vec, @@ -80,7 +92,7 @@ impl Batch { self.addr_index = 0; } - pub fn add(&mut self, to_addr: &[u8], ckey: usize) -> Result { + pub fn add(&mut self, to_addr: &[u8], use_router: bool, ckey: usize) -> Result { if self.nodes.len() == self.nodes.capacity() { return Err(()); } @@ -94,9 +106,9 @@ impl Batch { let mut pos = self.addrs.len(); - for (i, a) in self.addrs.iter().enumerate() { - if a.0.as_ref() == to_addr { - pos = i; + for (n, ai) in self.addrs.iter().enumerate() { + if ai.addr.as_slice() == to_addr && ai.use_router == use_router { + pos = n; } } @@ -106,9 +118,13 @@ impl Batch { } // connection limits to_addr to FROM_MAX so this is guaranteed to succeed - let a = ArrayVec::try_from(to_addr).unwrap(); + let addr = ArrayVec::try_from(to_addr).unwrap(); - self.addrs.push((a, list::List::default())); + self.addrs.push(AddrItem { + addr, + use_router, + keys: list::List::default(), + }); } else { // adding not allowed if take_group() has already moved past the index if pos < self.addr_index { @@ -117,7 +133,7 @@ impl Batch { } let nkey = self.nodes.insert(list::Node::new(ckey)); - self.addrs[pos].1.push_back(&mut self.nodes, nkey); + self.addrs[pos].keys.push_back(&mut self.nodes, nkey); Ok(BatchKey { addr_index: pos, @@ -127,7 +143,7 @@ impl Batch { pub fn remove(&mut self, key: BatchKey) { self.addrs[key.addr_index] - .1 + .keys .remove(&mut self.nodes, key.nkey); self.nodes.remove(key.nkey); } @@ -141,7 +157,7 @@ impl Batch { while ids.is_empty() { // find the next addr with items - while self.addr_index < addrs.len() && addrs[self.addr_index].1.is_empty() { + while self.addr_index < addrs.len() && addrs[self.addr_index].keys.is_empty() { self.addr_index += 1; } @@ -151,7 +167,7 @@ impl Batch { return None; } - let keys = &mut addrs[self.addr_index].1; + let keys = &mut addrs[self.addr_index].keys; self.last_group_ckeys.clear(); ids.clear(); @@ -173,9 +189,13 @@ impl Batch { } } - let addr = &addrs[self.addr_index].0; + let ai = &addrs[self.addr_index]; - Some(BatchGroup { addr, ids }) + Some(BatchGroup { + addr: &ai.addr, + use_router: ai.use_router, + ids, + }) } pub fn last_group_ckeys(&self) -> &[usize] { @@ -189,20 +209,21 @@ mod tests { #[test] fn add_take() { - let ids = ["id-1", "id-2", "id-3"]; - let mut batch = Batch::new(3); + let ids = ["id-1", "id-2", "id-3", "id-4"]; + let mut batch = Batch::new(4); - assert_eq!(batch.capacity(), 3); + assert_eq!(batch.capacity(), 4); assert_eq!(batch.len(), 0); assert!(batch.last_group_ckeys().is_empty()); - assert!(batch.add(b"addr-a", 1).is_ok()); - assert!(batch.add(b"addr-a", 2).is_ok()); - assert!(batch.add(b"addr-b", 3).is_ok()); - assert_eq!(batch.len(), 3); + assert!(batch.add(b"addr-a", false, 1).is_ok()); + assert!(batch.add(b"addr-a", false, 2).is_ok()); + assert!(batch.add(b"addr-b", false, 3).is_ok()); + assert!(batch.add(b"addr-b", true, 4).is_ok()); + assert_eq!(batch.len(), 4); - assert!(batch.add(b"addr-c", 4).is_err()); - assert_eq!(batch.len(), 3); + assert!(batch.add(b"addr-c", false, 5).is_err()); + assert_eq!(batch.len(), 4); assert_eq!(batch.is_empty(), false); let group = batch @@ -214,6 +235,7 @@ mod tests { assert_eq!(group.ids()[1].id, b"id-2"); assert_eq!(group.ids()[1].seq, Some(0)); assert_eq!(group.addr(), b"addr-a"); + assert!(!group.use_router()); drop(group); assert_eq!(batch.is_empty(), false); assert_eq!(batch.last_group_ckeys(), &[1, 2]); @@ -225,14 +247,27 @@ mod tests { assert_eq!(group.ids()[0].id, b"id-3"); assert_eq!(group.ids()[0].seq, Some(0)); assert_eq!(group.addr(), b"addr-b"); + assert!(!group.use_router()); drop(group); - assert_eq!(batch.is_empty(), true); + assert_eq!(batch.is_empty(), false); assert_eq!(batch.last_group_ckeys(), &[3]); + let group = batch + .take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0))) + .unwrap(); + assert_eq!(group.ids().len(), 1); + assert_eq!(group.ids()[0].id, b"id-4"); + assert_eq!(group.ids()[0].seq, Some(0)); + assert_eq!(group.addr(), b"addr-b"); + assert!(group.use_router()); + drop(group); + assert_eq!(batch.is_empty(), true); + assert_eq!(batch.last_group_ckeys(), &[4]); + assert!(batch .take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0))) .is_none()); - assert_eq!(batch.last_group_ckeys(), &[3]); + assert_eq!(batch.last_group_ckeys(), &[4]); } #[test] @@ -240,8 +275,8 @@ mod tests { let ids = ["id-1", "id-2", "id-3"]; let mut batch = Batch::new(3); - let bkey = batch.add(b"addr-a", 1).unwrap(); - assert!(batch.add(b"addr-b", 2).is_ok()); + let bkey = batch.add(b"addr-a", false, 1).unwrap(); + assert!(batch.add(b"addr-b", false, 2).is_ok()); assert_eq!(batch.len(), 2); batch.remove(bkey); assert_eq!(batch.len(), 1); @@ -256,7 +291,7 @@ mod tests { drop(group); assert_eq!(batch.is_empty(), true); - assert!(batch.add(b"addr-a", 3).is_ok()); + assert!(batch.add(b"addr-a", false, 3).is_ok()); assert_eq!(batch.len(), 1); assert!(!batch.is_empty()); @@ -276,9 +311,9 @@ mod tests { let ids = ["id-1", "id-2", "id-3"]; let mut batch = Batch::new(3); - assert!(batch.add(b"addr-a", 1).is_ok()); - assert!(batch.add(b"addr-b", 2).is_ok()); - assert!(batch.add(b"addr-b", 3).is_ok()); + assert!(batch.add(b"addr-a", false, 1).is_ok()); + assert!(batch.add(b"addr-b", false, 2).is_ok()); + assert!(batch.add(b"addr-b", false, 3).is_ok()); let group = batch .take_group(|ckey| { diff --git a/src/connmgr/client.rs b/src/connmgr/client.rs index 6dbf559e..125a41a7 100644 --- a/src/connmgr/client.rs +++ b/src/connmgr/client.rs @@ -381,7 +381,7 @@ impl Connections { None => return Err(()), }; - let bkey = items.batch.add(addr, ckey)?; + let bkey = items.batch.add(addr, false, ckey)?; ci.batch_key = Some(bkey); diff --git a/src/connmgr/server.rs b/src/connmgr/server.rs index 96672ce5..a86466ce 100644 --- a/src/connmgr/server.rs +++ b/src/connmgr/server.rs @@ -487,7 +487,7 @@ impl Connections { None => return Err(()), }; - let bkey = items.batch.add(addr, ckey)?; + let bkey = items.batch.add(addr, false, ckey)?; ci.batch_key = Some(bkey);