From ce68ff9450f1c6d58ed2e908f7d0002ec8c7ecf6 Mon Sep 17 00:00:00 2001 From: "zhuxiujia@qq.com" Date: Tue, 26 Dec 2023 20:02:34 +0800 Subject: [PATCH] default_max use cpu*4 --- Cargo.toml | 3 ++- src/lib.rs | 16 ++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0f9e03f..52dc144 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fast_pool" -version = "0.1.0" +version = "0.1.1" edition = "2021" description = "The Fast Pool based on channel" readme = "README.md" @@ -16,4 +16,5 @@ async-trait = "0.1" futures-core = { version = "0.3" } flume = "0.11.0" tokio = {version = "1",features = ["time","rt-multi-thread","macros"]} +num_cpus = {version = "1.16.0"} [dev-dependencies] \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index e627926..e1a5476 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,12 +42,13 @@ pub trait Manager { impl Pool { pub fn new(m: M) -> Self { + let default_max = num_cpus::get() as u64 * 4; let (s, r) = flume::unbounded(); Self { manager: Arc::new(m), sender: s, receiver: r, - max_open: Arc::new(AtomicU64::new(10)), + max_open: Arc::new(AtomicU64::new(default_max)), in_use: Arc::new(AtomicU64::new(0)), } } @@ -103,10 +104,12 @@ impl Pool { max_open: self.max_open.load(Ordering::Relaxed), connections: self.in_use.load(Ordering::Relaxed) + self.sender.len() as u64, in_use: self.in_use.load(Ordering::Relaxed), + idle: self.sender.len() as u64, } } pub fn set_max_open(&self, n: u64) { + self.max_open.store(n, Ordering::SeqCst); let open = self.sender.len() as u64; if open > n { let del = open - n; @@ -114,7 +117,6 @@ impl Pool { _ = self.receiver.try_recv(); } } - self.max_open.store(n, Ordering::SeqCst); } } @@ -142,13 +144,13 @@ impl DerefMut for ConnectionBox { impl Drop for ConnectionBox { fn drop(&mut self) { + self.in_use.fetch_sub(1, Ordering::SeqCst); if let Some(v) = self.inner.take() { let max_open = self.max_open.load(Ordering::SeqCst); - if self.sender.len() < max_open as usize { + if self.sender.len() as u64 + self.in_use.load(Ordering::SeqCst) < max_open { _ = self.sender.send(v); } } - self.in_use.fetch_sub(1, Ordering::SeqCst); } } @@ -156,10 +158,12 @@ impl Drop for ConnectionBox { pub struct State { /// max open limit pub max_open: u64, - ///connections = in_use number + in_pool number + ///connections = in_use number + idle number pub connections: u64, /// user use connection number pub in_use: u64, + /// idle connection + pub idle: u64, } #[cfg(test)] @@ -192,7 +196,7 @@ mod test { #[tokio::test] async fn test_debug() { let p = Pool::new(TestManager {}); - println!("{:?}",p); + println!("{:?}", p); } // --nocapture