Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
e1732a364fed committed Jan 1, 2099
1 parent 0021b37 commit 8bd59c3
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 122 deletions.
40 changes: 25 additions & 15 deletions src/net/addr_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ pub async fn cp_addr<R: AddrReadTrait + 'static, W: AddrWriteTrait + 'static>(
}

/// copy data between two [`AddrConn`] struct
///
/// blocking
#[inline]
pub async fn cp(
cid: CID,
Expand All @@ -440,19 +442,9 @@ pub async fn cp(
let cp1 = tokio::spawn(cp_addr(c1.r, c2.w, n1, no_timeout, rx1, false, opt.clone()));
let cp2 = tokio::spawn(cp_addr(c2.r, c1.w, n2, no_timeout, rx2, true, opt.clone()));

let (_tmpx0, tmp_rx0) = oneshot::channel();
let shutdown_rx1 = if let Some(x) = shutdown_rx1 {
x
} else {
tmp_rx0
};

let (_tmpx, tmp_rx) = oneshot::channel();
let shutdown_rx2 = if let Some(x) = shutdown_rx2 {
x
} else {
tmp_rx
};
if let Some(gtr) = &opt {
gtr.alive_connection_count.fetch_add(1, Ordering::Relaxed);
}

let r = tokio::select! {
r = cp1 =>{
Expand All @@ -479,7 +471,13 @@ pub async fn cp(
Err(_) => 0,
}
}
_ = shutdown_rx1 =>{
_ = async{
if let Some(shutdown_rx1) = shutdown_rx1{
shutdown_rx1.await
}else{
std::future::pending().await
}
} =>{
debug!("addrconn cp_between got shutdown1 signal");

let _ = tx1.send(());
Expand All @@ -488,7 +486,13 @@ pub async fn cp(
0
}

_ = shutdown_rx2 =>{
_ = async{
if let Some(shutdown_rx2) = shutdown_rx2{
shutdown_rx2.await
}else{
std::future::pending().await
}
} =>{
debug!("addrconn cp_between got shutdown2 signal");
let _ = tx1.send(());
let _ = tx2.send(());
Expand All @@ -497,6 +501,12 @@ pub async fn cp(
}
};

if let Some(gtr) = opt {
gtr.alive_connection_count
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
info!( cid = %cid, "cp_addr_conn end" );

Ok(r)
}

Expand Down
7 changes: 6 additions & 1 deletion src/net/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ pub type UpdateSender = tokio::sync::mpsc::Sender<(CID, u64)>;
pub type Updater = (UpdateSender, UpdateSender);
pub type OptUpdater = Option<Updater>;

#[allow(unused)]
/// copy between two AsyncConn
///
/// blocking
#[allow(unused_variables)]
pub async fn copy<C1: AsyncConn, C2: AsyncConn>(
local_c: &mut C1,
remote_c: &mut C2,
Expand All @@ -31,6 +34,8 @@ pub async fn copy<C1: AsyncConn, C2: AsyncConn>(

/// cp with updater will send msg when each single read/write ends.
///
/// blocking
///
/// Note: the more info you want to access, the slower performance you would get
#[cfg(feature = "trace")]
pub async fn cp_with_updater<C1: AsyncConn, C2: AsyncConn>(
Expand Down
2 changes: 1 addition & 1 deletion src/net/udp_fixed_listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl AsyncWriteAddr for Writer {
fn poll_close_addr(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let x = &self.conn_map;
let f = x.lock();
let x = Future::poll(Pin::new(&mut Box::pin(f)), cx);
let x = Future::poll(std::pin::pin!(f), cx);
match x {
Poll::Ready(mut map) => {
map.remove(&self.src);
Expand Down
135 changes: 57 additions & 78 deletions src/relay/cp_ac_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::net::CID;
use bytes::BytesMut;
use std::io;
use std::ops::DerefMut;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
Expand All @@ -30,77 +31,53 @@ pub struct CpAddrConnAndConnArgs {
pub no_timeout: bool,
}

/// blocking. discard udp addr part when copy from AddrConn to Conn
pub async fn cp_addr_conn_and_conn(args: CpAddrConnAndConnArgs) -> io::Result<u64> {
let cid = args.cid;
let mut ac = args.ac;
let mut c = args.c;
let ed_from_ac = args.ed_from_ac;
let ed = args.ed;
let first_target = args.first_target;
let gtr = args.gtr;
let no_timeout = args.no_timeout;
use crate::Name;
info!(cid = %cid, ac = ac.name(), "cp_addr_conn_and_conn start",);

let tic = gtr.clone();
if let Some(ed) = args.ed {
if ed_from_ac {
c.write_all(&ed).await?;
} else {
ac.w.write(&ed, &args.first_target.unwrap_or_default())
.await?;
}
}

if let Some(gtr) = &gtr {
gtr.alive_connection_count.fetch_add(1, Ordering::Relaxed);
}

scopeguard::defer! {

if let Some(gtr) = tic {
if let Some(gtr) = &gtr {
gtr.alive_connection_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);

}
info!( cid = %cid,
"cp_addr_conn_and_conn end", );
}
//might discard udp addr part

if let Some(ed) = ed {
if ed_from_ac {
let r = c.write(&ed).await;
if r.is_err() {
return r.map(|x| x as u64);
}
} else {
let r = ac.w.write(&ed, &first_target.unwrap_or_default()).await;
if r.is_err() {
return r.map(|x| x as u64);
}
}
}

let (mut r, mut w) = tokio::io::split(c);

if no_timeout {
let (c1_to_c2, c2_to_c1) = (
cp_conn_to_addr_conn(&mut r, ac.w),
cp_addr_conn_to_conn(ac.r, &mut w),
);

tokio::select! {
r1 = c1_to_c2 => {

r1
}
r2 = c2_to_c1 => {

r2
}
tokio::select! {
r1 = cp_conn_to_addr_conn(&mut r, ac.w) => {
r1
}
} else {
let (c1_to_c2, c2_to_c1) = (
cp_conn_to_addr_conn(&mut r, ac.w),
cp_addr_conn_to_conn_timeout(ac.r, &mut w),
);

tokio::select! {
r1 = c1_to_c2 => {

r1
}
r2 = c2_to_c1 => {

r2
r2 = async{
if args.no_timeout{
cp_addr_conn_to_conn(ac.r, &mut w).await
}else{
cp_addr_conn_to_conn_timeout(ac.r, &mut w).await
}
} => {
r2
}
}
}
Expand All @@ -110,14 +87,14 @@ where
R: AsyncRead + Unpin + ?Sized,
{
let mut whole: u64 = 0;
let mut buf0 = Box::new([0u8; MTU]);
let mut buf = Box::new([0u8; MTU]);

let a = net::Addr::default();
loop {
let r = r.read(buf0.deref_mut()).await;
let r = r.read(buf.deref_mut()).await;
match r {
Ok(n) => {
let r = w.write(&buf0[..n], &a).await;
let r = w.write(&buf[..n], &a).await;
match r {
Ok(n) => whole += n as u64,
Err(_) => break,
Expand All @@ -140,49 +117,51 @@ where
W: AsyncWrite + Unpin + ?Sized,
{
let mut whole_write = 0;
let mut buf = Box::new([0u8; MTU]);

loop {
let r1ref = &mut r;
let r_ref = &mut r;
let buf_ref = buf.deref_mut();

let sleep_f = tokio::time::sleep(CP_UDP_TIMEOUT);
let read_f = async move {
let mut buf0 = Box::new([0u8; MTU]);
let mut buf = ReadBuf::new(buf0.deref_mut());
let r = r1ref.read(buf.initialized_mut()).await;

(r, buf0)
let mut buf = ReadBuf::new(buf_ref);
r_ref.read(buf.initialized_mut()).await
};

tokio::select! {
_ = sleep_f =>{
debug!("read addrconn timeout");
_ = tokio::time::sleep(CP_UDP_TIMEOUT) =>{
debug!("cp_addr_conn_to_conn timeout");

break;
}
r = read_f =>{
let (r, buf0) = r;

match r {
Err(_) => break,
Err(e) => {
debug!("cp_addr_to_conn, read got e, will break: {e}");
break
},
Ok((m, _ad)) => {
if m > 0 {
//debug!("cp_addr_to_conn, read got {m}");
let r = w.write(&buf0[..m]).await;
if let Ok(n) = r{
//debug!("cp_addr_to_conn, write ok {n}");

whole_write += n;

let r = w.flush().await;
if r.is_err(){
debug!("cp_addr_to_conn, write flush not ok ");
let r = w.write(&buf[..m]).await;
match r{
Ok(n) => {
//debug!("cp_addr_to_conn, write ok {n}");

whole_write += n;

let r = w.flush().await;
if let Err(e) = r{
debug!("cp_addr_to_conn, write flush not ok: {e}");
break;
}
},
Err(_) => {
debug!("cp_addr_to_conn, write not ok ");
break;
}

}else{
debug!("cp_addr_to_conn, write not ok ");
break;
},
}

}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/relay/cp_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn no_gtr_ed(
)
.await;

debug!("{}, relay end", cid);
debug!(cid = %cid, "relay end");
}

async fn gtr_ed(
Expand Down
Loading

0 comments on commit 8bd59c3

Please sign in to comment.