From b67b689eff9b425a051a2042ba4b5082bf8aa766 Mon Sep 17 00:00:00 2001 From: "Michael P. Jung" Date: Mon, 14 Feb 2022 23:20:29 +0100 Subject: [PATCH] Add Pool::swap_config method --- Cargo.toml | 1 + src/managed/mod.rs | 43 +++++++++++++++++++++++++++------- tests/managed.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8f8153bd..f56f1776 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ deadpool-runtime = { version = "0.1", path = "./runtime" } # `tokio::sync::Semaphore`. No other features of `tokio` are enabled or used # unless the `rt_tokio_1` feature is enabled. tokio = { version = "1.0", features = ["sync"] } +arc-swap = "1.5.0" [dev-dependencies] async-std = { version = "1.0", features = ["attributes"] } diff --git a/src/managed/mod.rs b/src/managed/mod.rs index 205f33ae..d18d79b2 100644 --- a/src/managed/mod.rs +++ b/src/managed/mod.rs @@ -72,11 +72,12 @@ use std::{ ops::{Deref, DerefMut}, sync::{ atomic::{AtomicIsize, Ordering}, - Arc, Mutex, Weak, + Arc, Mutex, MutexGuard, Weak, }, time::{Duration, Instant}, }; +use arc_swap::ArcSwap; use async_trait::async_trait; use deadpool_runtime::Runtime; use retain_mut::RetainMut; @@ -287,7 +288,7 @@ impl>> Pool { }), available: AtomicIsize::new(0), semaphore: Semaphore::new(builder.config.max_size), - config: builder.config, + config: ArcSwap::new(Arc::new(builder.config)), hooks: builder.hooks, runtime: builder.runtime, }), @@ -466,14 +467,24 @@ impl>> Pool { * always reports a `max_size` of 0 for closed pools. */ pub fn resize(&self, max_size: usize) { + self.swap_config(PoolConfig { + max_size, + ..self.inner.config.load() + }); + } + + /// Internal _resize method which uses an already + /// locked mutex. This method is used by the `swap_config` + /// method. + fn _resize(&self, slots: MutexGuard<'_, Slots>>) { if self.inner.semaphore.is_closed() { return; } - let mut slots = self.inner.slots.lock().unwrap(); let old_max_size = slots.max_size; - slots.max_size = max_size; + let new_max_size = self.config().max_size; + slots.max_size = new_max_size; // shrink pool - if max_size < old_max_size { + if new_max_size < old_max_size { while slots.size > slots.max_size { if let Ok(permit) = self.inner.semaphore.try_acquire() { permit.forget(); @@ -485,14 +496,14 @@ impl>> Pool { } } // Create a new VecDeque with a smaller capacity - let mut vec = VecDeque::with_capacity(max_size); + let mut vec = VecDeque::with_capacity(new_max_size); for obj in slots.vec.drain(..) { vec.push_back(obj); } slots.vec = vec; } // grow pool - if max_size > old_max_size { + if new_max_size > old_max_size { let additional = slots.max_size - slots.size; slots.vec.reserve_exact(additional); self.inner.semaphore.add_permits(additional); @@ -536,9 +547,23 @@ impl>> Pool { guard.size -= len_before - guard.vec.len(); } + /// Get current pool configuration + pub fn config(&self) -> Arc { + self.inner.config.load_full() + } + + /// Swap config. + pub fn swap_config(&self, config: PoolConfig) { + let slots = self.inner.slots.lock().unwrap(); + self.inner.config.swap(Arc::new(config)); + if slots.max_size != config.max_size { + self._resize(config.max_size, slots); + } + } + /// Get current timeout configuration pub fn timeouts(&self) -> Timeouts { - self.inner.config.timeouts + self.inner.config.load().timeouts } /// Closes this [`Pool`]. @@ -584,7 +609,7 @@ struct PoolInner { /// the number of [`Future`]s waiting for an [`Object`]. available: AtomicIsize, semaphore: Semaphore, - config: PoolConfig, + config: ArcSwap, runtime: Option, hooks: hooks::Hooks, } diff --git a/tests/managed.rs b/tests/managed.rs index b56be15c..634f3836 100644 --- a/tests/managed.rs +++ b/tests/managed.rs @@ -261,6 +261,64 @@ async fn resize_pool_grow_concurrent() { assert_eq!(pool.status().available, 1); } +#[tokio::test] +async fn resize_pool_with_config() { + let mgr = Manager {}; + let pool = Pool::builder(mgr).max_size(0).build().unwrap(); + let join_handle = { + let pool = pool.clone(); + tokio::spawn(async move { pool.get().await }) + }; + tokio::task::yield_now().await; + assert_eq!( + pool.status(), + Status { + max_size: 0, + size: 0, + available: -1 + } + ); + pool.swap_config(PoolConfig { + max_size: 1, + ..pool.config() + }); + assert_eq!( + pool.status(), + Status { + max_size: 1, + size: 0, + available: -1 + } + ); + tokio::task::yield_now().await; + assert_eq!( + pool.status(), + Status { + max_size: 1, + size: 1, + available: 0, + } + ); + let obj0 = join_handle.await.unwrap().unwrap(); + assert_eq!( + pool.status(), + Status { + max_size: 1, + size: 1, + available: 0 + } + ); + pool.swap_config(PoolConfig { + max_size: 2, + ..pool.config() + }); + assert_eq!(pool.status(), Status { + max_size: 2, + size: 1, + available, 0, + }); +} + #[tokio::test] async fn retain() { let mgr = Manager {};