Skip to content

Commit

Permalink
change to flume
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuxiujia committed Dec 27, 2023
1 parent b3de179 commit f7a94f2
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
workspace = { members = ["example"] }
[package]
name = "fast_pool"
version = "0.1.2"
version = "0.1.3"
edition = "2021"
description = "The Fast Pool based on channel"
readme = "README.md"
Expand All @@ -17,5 +17,5 @@ async-trait = "0.1"
futures-core = { version = "0.3" }
tokio = {version = "1",features = ["time","rt-multi-thread","macros"]}
num_cpus = {version = "1.16.0"}
crossfire = "1.0.1"
flume = { version = "0.11.0", default-features = false, features = ["async"] }
[dev-dependencies]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
a fast async pool based on channel
* support `get()`,`get_timeout()`,`state()` methods
* support atomic max_open(Resize freely)
* based on [crossbeam channel(crossfire)](https://crates.io/crates/crossfire)
* based on [flume](https://crates.io/crates/flume)

### way fast_pool?

Expand Down
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
mod defer;

use async_trait::async_trait;
use crossfire::mpmc::{RxUnbounded, TxUnbounded};
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use flume::{Receiver, Sender};

/// Pool have manager, get/get_timeout Connection from Pool
pub struct Pool<M: Manager> {
manager: Arc<M>,
idle_send: Arc<TxUnbounded<M::Connection>>,
idle_recv: Arc<RxUnbounded<M::Connection>>,
idle_send: Arc<Sender<M::Connection>>,
idle_recv: Arc<Receiver<M::Connection>>,
max_open: Arc<AtomicU64>,
in_use: Arc<AtomicU64>,
waits: Arc<AtomicU64>,
Expand Down Expand Up @@ -61,7 +61,7 @@ impl<M: Manager> Pool<M> {
<M as Manager>::Connection: Unpin,
{
let default_max = num_cpus::get() as u64 * 4;
let (s, r) = crossfire::mpmc::unbounded_future();
let (s, r) = flume::unbounded();
Self {
manager: Arc::new(m),
idle_send: Arc::new(s),
Expand Down Expand Up @@ -91,7 +91,7 @@ impl<M: Manager> Pool<M> {
.map_err(|e| M::Error::from(&e.to_string()))?;
}
self.idle_recv
.recv()
.recv_async()
.await
.map_err(|e| M::Error::from(&e.to_string()))
};
Expand Down Expand Up @@ -146,7 +146,7 @@ impl<M: Manager> Pool<M> {

pub struct ConnectionBox<M: Manager> {
pub inner: Option<M::Connection>,
sender: Arc<TxUnbounded<M::Connection>>,
sender: Arc<Sender<M::Connection>>,
in_use: Arc<AtomicU64>,
max_open: Arc<AtomicU64>,
}
Expand Down

0 comments on commit f7a94f2

Please sign in to comment.