diff --git a/rt/src/net/uds/stream.rs b/rt/src/net/uds/stream.rs index b8441b7f..e8fdc3ab 100644 --- a/rt/src/net/uds/stream.rs +++ b/rt/src/net/uds/stream.rs @@ -90,6 +90,19 @@ impl UnixStream { socket } + /// Converts a [`std::os::unix::net::UnixStream`] to a + /// [`heph_rt::net::UnixStream`]. + /// + /// [`heph_rt::net::UnixStream`]: UnixStream + pub fn from_std(rt: &RT, stream: std::os::unix::net::UnixStream) -> UnixStream + where + RT: Access, + { + UnixStream { + fd: AsyncFd::new(stream.into(), rt.submission_queue()), + } + } + /// Automatically set the CPU affinity based on the runtime access `rt`. /// /// For non-Linux OSs this is a no-op. If `rt` is not local this is also a diff --git a/rt/tests/functional/uds/stream.rs b/rt/tests/functional/uds/stream.rs index d49a85df..643bcf6f 100644 --- a/rt/tests/functional/uds/stream.rs +++ b/rt/tests/functional/uds/stream.rs @@ -8,7 +8,8 @@ use std::time::Duration; use heph::actor::{self, actor_fn}; use heph_rt::net::uds::{UnixAddr, UnixStream}; use heph_rt::spawn::ActorOptions; -use heph_rt::test::{join, try_spawn_local, PanicSupervisor}; +use heph_rt::test::{block_on_local_actor, join, try_spawn_local, PanicSupervisor}; +use heph_rt::ThreadLocal; use heph_rt::{self as rt}; use crate::util::temp_file; @@ -97,3 +98,22 @@ fn connect() { let actor_ref = try_spawn_local(PanicSupervisor, actor, (), ActorOptions::default()).unwrap(); join(&actor_ref, Duration::from_secs(1)).unwrap(); } + +#[test] +fn stream_from_std() { + async fn actor(ctx: actor::Context) -> io::Result<()> { + let (mut peer, stream) = net::UnixStream::pair()?; + let stream = UnixStream::from_std(ctx.runtime_ref(), stream); + + stream.send_all(DATA).await?; + + let mut buf = vec![0; DATA.len() + 2]; + let n = peer.read(&mut buf)?; + assert_eq!(n, DATA.len()); + assert_eq!(&buf[..n], DATA); + + Ok(()) + } + + block_on_local_actor(actor_fn(actor), ()).unwrap(); +}