Skip to content

Commit

Permalink
add tests for futures-io feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Congyuwang authored and sticnarf committed Jul 25, 2024
1 parent aa311ea commit 309a225
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 21 deletions.
105 changes: 105 additions & 0 deletions tests/common/futures_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use super::*;
use async_std::{net::TcpListener, os::unix::net::UnixStream};
use futures_util::{io::copy, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use once_cell::sync::OnceCell;
use std::{
future::Future,
io::{Read, Write},
net::{SocketAddr, TcpStream as StdTcpStream},
sync::Mutex,
};
use tokio_socks::{
io::Compat,
tcp::{socks4::Socks4Listener, socks5::Socks5Listener},
Error,
Result,
};

pub async fn echo_server() -> Result<()> {
let listener = TcpListener::bind(&SocketAddr::from(([0, 0, 0, 0], 10007))).await?;
loop {
let (stream, _) = listener.accept().await?;
async_std::task::spawn(async move {
let (mut reader, mut writer) = stream.split();
copy(&mut reader, &mut writer).await.unwrap();
});
}
}

pub async fn reply_response<S: AsyncRead + AsyncWrite + Unpin>(mut socket: S) -> Result<[u8; 5]> {
socket.write_all(MSG).await?;
let mut buf = [0; 5];
socket.read_exact(&mut buf).await?;
Ok(buf)
}

pub async fn test_connect<S: AsyncRead + AsyncWrite + Unpin>(socket: S) -> Result<()> {
let res = reply_response(socket).await?;
assert_eq!(&res[..], MSG);
Ok(())
}

pub fn test_bind<S: 'static + AsyncRead + AsyncWrite + Unpin + Send>(
listener: Socks5Listener<Compat<S>>,
) -> Result<()> {
let bind_addr = listener.bind_addr().to_owned();
runtime().lock().unwrap().spawn(async move {
let stream = listener.accept().await.unwrap();
let (mut reader, mut writer) = stream.split();
copy(&mut reader, &mut writer).await.unwrap();
});

let mut tcp = StdTcpStream::connect(bind_addr)?;
tcp.write_all(MSG)?;
let mut buf = [0; 5];
tcp.read_exact(&mut buf[..])?;
assert_eq!(&buf[..], MSG);
Ok(())
}

pub async fn connect_unix(proxy_addr: &str) -> Result<UnixStream> {
UnixStream::connect(proxy_addr).await.map_err(Error::Io)
}

pub struct Runtime;

impl Runtime {
pub fn spawn<F, T>(&self, future: F)
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
async_std::task::spawn(future);
}

pub fn block_on<F, T>(&self, future: F) -> T
where F: Future<Output = T> {
async_std::task::block_on(future)
}
}

pub fn runtime() -> &'static Mutex<Runtime> {
static RUNTIME: OnceCell<Mutex<Runtime>> = OnceCell::new();
RUNTIME.get_or_init(|| {
async_std::task::spawn(async { echo_server().await.expect("Unable to bind") });
Mutex::new(Runtime)
})
}

pub fn test_bind_socks4<S: 'static + AsyncRead + AsyncWrite + Unpin + Send>(
listener: Socks4Listener<Compat<S>>,
) -> Result<()> {
let bind_addr = listener.bind_addr().to_owned();
runtime().lock().unwrap().spawn(async move {
let stream = listener.accept().await.unwrap();
let (mut reader, mut writer) = AsyncReadExt::split(stream);
copy(&mut reader, &mut writer).await.unwrap();
});

let mut tcp = StdTcpStream::connect(bind_addr)?;
tcp.write_all(MSG)?;
let mut buf = [0; 5];
tcp.read_exact(&mut buf[..])?;
assert_eq!(&buf[..], MSG);
Ok(())
}
17 changes: 17 additions & 0 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#![allow(dead_code)]
#[cfg(feature = "futures-io")]
pub mod futures_utils;
#[cfg(feature = "tokio")]
pub mod tokio_utils;

#[cfg(feature = "futures-io")]
pub use tokio_socks::io::FuturesIoCompatExt as _;
#[cfg(feature = "tokio")]
pub use tokio_utils::*;

pub const UNIX_PROXY_ADDR: &str = "/tmp/proxy.s";
pub const PROXY_ADDR: &str = "127.0.0.1:41080";
pub const UNIX_SOCKS4_PROXY_ADDR: &str = "/tmp/socks4_proxy.s";
pub const SOCKS4_PROXY_ADDR: &str = "127.0.0.1:41081";
pub const ECHO_SERVER_ADDR: &str = "localhost:10007";
pub const MSG: &[u8] = b"hello";
16 changes: 7 additions & 9 deletions tests/common.rs → tests/common/tokio_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::*;
use once_cell::sync::OnceCell;
use std::{
io::{Read, Write},
Expand All @@ -9,14 +10,11 @@ use tokio::{
net::{TcpListener, UnixStream},
runtime::Runtime,
};
use tokio_socks::{tcp::socks4::Socks4Listener, tcp::socks5::Socks5Listener, Error, Result};

pub const UNIX_PROXY_ADDR: &'static str = "/tmp/proxy.s";
pub const PROXY_ADDR: &'static str = "127.0.0.1:41080";
pub const UNIX_SOCKS4_PROXY_ADDR: &'static str = "/tmp/socks4_proxy.s";
pub const SOCKS4_PROXY_ADDR: &'static str = "127.0.0.1:41081";
pub const ECHO_SERVER_ADDR: &'static str = "localhost:10007";
pub const MSG: &[u8] = b"hello";
use tokio_socks::{
tcp::{socks4::Socks4Listener, socks5::Socks5Listener},
Error,
Result,
};

pub async fn echo_server() -> Result<()> {
let listener = TcpListener::bind(&SocketAddr::from(([0, 0, 0, 0], 10007))).await?;
Expand Down Expand Up @@ -58,7 +56,7 @@ pub fn test_bind<S: 'static + AsyncRead + AsyncWrite + Unpin + Send>(listener: S
Ok(())
}

pub async fn connect_unix(proxy_addr: &'static str) -> Result<UnixStream> {
pub async fn connect_unix(proxy_addr: &str) -> Result<UnixStream> {
UnixStream::connect(proxy_addr).await.map_err(Error::Io)
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ sleep 2
for test in ${list}; do
3proxy ${dir}/${test}.cfg
sleep 1
cargo test --test ${test} -- --test-threads 1
cargo test --test ${test} --all-features -- --test-threads 1
test_exit_code=$?

pkill -F /tmp/3proxy-test.pid
Expand Down
33 changes: 32 additions & 1 deletion tests/long_username_password_auth.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
mod common;

use common::{connect_unix, runtime, test_bind, test_connect, ECHO_SERVER_ADDR, PROXY_ADDR, UNIX_PROXY_ADDR};
use common::*;
use tokio_socks::{
tcp::socks5::{Socks5Listener, Socks5Stream},
Result,
};

#[cfg(feature = "tokio")]
#[test]
fn connect_long_username_password() -> Result<()> {
let runtime = runtime().lock().unwrap();
Expand All @@ -15,6 +16,7 @@ fn connect_long_username_password() -> Result<()> {
runtime.block_on(test_connect(conn))
}

#[cfg(feature = "tokio")]
#[test]
fn bind_long_username_password() -> Result<()> {
let bind = {
Expand All @@ -29,6 +31,7 @@ fn bind_long_username_password() -> Result<()> {
test_bind(bind)
}

#[cfg(feature = "tokio")]
#[test]
fn connect_with_socket_long_username_password() -> Result<()> {
let runtime = runtime().lock().unwrap();
Expand All @@ -39,6 +42,7 @@ fn connect_with_socket_long_username_password() -> Result<()> {
runtime.block_on(test_connect(conn))
}

#[cfg(feature = "tokio")]
#[test]
fn bind_with_socket_long_username_password() -> Result<()> {
let bind = {
Expand All @@ -53,3 +57,30 @@ fn bind_with_socket_long_username_password() -> Result<()> {
}?;
test_bind(bind)
}

#[cfg(feature = "futures-io")]
#[test]
fn connect_with_socket_long_username_password_futures_io() -> Result<()> {
let runtime = futures_utils::runtime().lock().unwrap();
let socket = runtime.block_on(futures_utils::connect_unix(UNIX_PROXY_ADDR))?.compat();
let conn = runtime.block_on(Socks5Stream::connect_with_password_and_socket(
socket, ECHO_SERVER_ADDR, "mylonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglogin",
"longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglongpassword"))?;
runtime.block_on(futures_utils::test_connect(conn))
}

#[cfg(feature = "futures-io")]
#[test]
fn bind_with_socket_long_username_password_futures_io() -> Result<()> {
let bind = {
let runtime = futures_utils::runtime().lock().unwrap();
let socket = runtime.block_on(futures_utils::connect_unix(UNIX_PROXY_ADDR))?.compat();
runtime.block_on(Socks5Listener::bind_with_password_and_socket(
socket,
ECHO_SERVER_ADDR,
"mylonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglogin",
"longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglongpassword"
))
}?;
futures_utils::test_bind(bind)
}
27 changes: 25 additions & 2 deletions tests/no_auth.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
mod common;

use crate::common::{runtime, test_bind};
use common::{connect_unix, test_connect, ECHO_SERVER_ADDR, PROXY_ADDR, UNIX_PROXY_ADDR};
use common::*;
use tokio_socks::{
tcp::socks5::{Socks5Listener, Socks5Stream},
Result,
};

#[cfg(feature = "tokio")]
#[test]
fn connect_no_auth() -> Result<()> {
let runtime = runtime().lock().unwrap();
let conn = runtime.block_on(Socks5Stream::connect(PROXY_ADDR, ECHO_SERVER_ADDR))?;
runtime.block_on(test_connect(conn))
}

#[cfg(feature = "tokio")]
#[test]
fn bind_no_auth() -> Result<()> {
let bind = {
Expand All @@ -23,6 +24,7 @@ fn bind_no_auth() -> Result<()> {
test_bind(bind)
}

#[cfg(feature = "tokio")]
#[test]
fn connect_with_socket_no_auth() -> Result<()> {
let runtime = runtime().lock().unwrap();
Expand All @@ -31,6 +33,7 @@ fn connect_with_socket_no_auth() -> Result<()> {
runtime.block_on(test_connect(conn))
}

#[cfg(feature = "tokio")]
#[test]
fn bind_with_socket_no_auth() -> Result<()> {
let bind = {
Expand All @@ -40,3 +43,23 @@ fn bind_with_socket_no_auth() -> Result<()> {
}?;
test_bind(bind)
}

#[cfg(feature = "futures-io")]
#[test]
fn connect_with_socket_no_auth_futures_io() -> Result<()> {
let runtime = futures_utils::runtime().lock().unwrap();
let socket = runtime.block_on(futures_utils::connect_unix(UNIX_PROXY_ADDR))?.compat();
let conn = runtime.block_on(Socks5Stream::connect_with_socket(socket, ECHO_SERVER_ADDR))?;
runtime.block_on(futures_utils::test_connect(conn))
}

#[cfg(feature = "futures-io")]
#[test]
fn bind_with_socket_no_auth_futures_io() -> Result<()> {
let bind = {
let runtime = futures_utils::runtime().lock().unwrap();
let socket = runtime.block_on(futures_utils::connect_unix(UNIX_PROXY_ADDR))?.compat();
runtime.block_on(Socks5Listener::bind_with_socket(socket, ECHO_SERVER_ADDR))
}?;
futures_utils::test_bind(bind)
}
34 changes: 30 additions & 4 deletions tests/socks4_no_auth.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
mod common;

use crate::common::runtime;
use common::{
connect_unix, test_bind_socks4, test_connect, ECHO_SERVER_ADDR, SOCKS4_PROXY_ADDR, UNIX_SOCKS4_PROXY_ADDR,
};
use common::*;
use tokio_socks::{
tcp::socks4::{Socks4Listener, Socks4Stream},
Result,
};

#[cfg(feature = "tokio")]
#[test]
fn connect_no_auth() -> Result<()> {
let runtime = runtime().lock().unwrap();
let conn = runtime.block_on(Socks4Stream::connect(SOCKS4_PROXY_ADDR, ECHO_SERVER_ADDR))?;
runtime.block_on(test_connect(conn))
}

#[cfg(feature = "tokio")]
#[test]
fn bind_no_auth() -> Result<()> {
let bind = {
Expand All @@ -25,6 +24,7 @@ fn bind_no_auth() -> Result<()> {
test_bind_socks4(bind)
}

#[cfg(feature = "tokio")]
#[test]
fn connect_with_socket_no_auth() -> Result<()> {
let runtime = runtime().lock().unwrap();
Expand All @@ -34,6 +34,7 @@ fn connect_with_socket_no_auth() -> Result<()> {
runtime.block_on(test_connect(conn))
}

#[cfg(feature = "tokio")]
#[test]
fn bind_with_socket_no_auth() -> Result<()> {
let bind = {
Expand All @@ -43,3 +44,28 @@ fn bind_with_socket_no_auth() -> Result<()> {
}?;
test_bind_socks4(bind)
}

#[cfg(feature = "futures-io")]
#[test]
fn connect_with_socket_no_auth_futures_io() -> Result<()> {
let runtime = futures_utils::runtime().lock().unwrap();
let socket = runtime
.block_on(futures_utils::connect_unix(UNIX_SOCKS4_PROXY_ADDR))?
.compat();
println!("socket connected");
let conn = runtime.block_on(Socks4Stream::connect_with_socket(socket, ECHO_SERVER_ADDR))?;
runtime.block_on(futures_utils::test_connect(conn))
}

#[cfg(feature = "futures-io")]
#[test]
fn bind_with_socket_no_auth_futures_io() -> Result<()> {
let bind = {
let runtime = futures_utils::runtime().lock().unwrap();
let socket = runtime
.block_on(futures_utils::connect_unix(UNIX_SOCKS4_PROXY_ADDR))?
.compat();
runtime.block_on(Socks4Listener::bind_with_socket(socket, ECHO_SERVER_ADDR))
}?;
futures_utils::test_bind_socks4(bind)
}
Loading

0 comments on commit 309a225

Please sign in to comment.