Skip to content

Commit

Permalink
chore(deps): upgrade to nix v0.27 (#630)
Browse files Browse the repository at this point in the history
* refactor: try to upgrade nix to v0.26 with replacing `SockAddr` with new `SockaddrStorage`

* refactor: use `ìmpl SockaddrLike` instead of `SockAddr` in `GlommioDatagram::send_to`

* chore(deps): upgrade to nix v0.27

* refactor: cleanup nix work

---------

Co-authored-by: Glauber Costa <[email protected]>
  • Loading branch information
aurelilys and Glauber Costa authored Jan 4, 2024
1 parent 8d93607 commit a14a826
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 129 deletions.
2 changes: 1 addition & 1 deletion glommio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lazy_static = "1.4"
libc = "0.2"
lockfree = "0.5"
log = "0.4"
nix = "0.23"
nix = { version = "0.27", features = ["event", "fs", "ioctl", "mman", "net", "poll", "sched", "time"] }
pin-project-lite = "0.2"
rlimit = "0.6"
scoped-tls = "1.0"
Expand Down
17 changes: 7 additions & 10 deletions glommio/src/iou/sqe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ use std::{

use super::registrar::{UringFd, UringReadBuf, UringWriteBuf};

use nix::sys::socket::{SockaddrLike, SockaddrStorage};
pub use nix::{
fcntl::{FallocateFlags, OFlag, PosixFadviseAdvice},
poll::PollFlags,
sys::{
epoll::{EpollEvent, EpollOp},
mman::MmapAdvise,
socket::{MsgFlags, SockAddr, SockFlag},
socket::{MsgFlags, SockFlag},
stat::Mode,
},
};

use super::Personality;
use crate::{sys::Statx, to_io_error, uring_sys};
use crate::{sys::Statx, uring_sys};

/// A pending IO event.
///
Expand Down Expand Up @@ -352,9 +353,10 @@ impl<'a> SQE<'a> {
}

#[inline]
pub unsafe fn prep_connect(&mut self, fd: impl UringFd, socket_addr: &SockAddr) {
let (addr, len) = socket_addr.as_ffi_pair();
uring_sys::io_uring_prep_connect(self.sqe, fd.as_raw_fd(), addr as *const _ as *mut _, len);
pub unsafe fn prep_connect(&mut self, fd: impl UringFd, socket_addr: &SockaddrStorage) {
let addr = socket_addr.as_ptr();
let len = socket_addr.len();
uring_sys::io_uring_prep_connect(self.sqe, fd.as_raw_fd(), addr as *mut _, len);
fd.update_sqe(self);
}

Expand Down Expand Up @@ -521,11 +523,6 @@ impl SockAddrStorage {
let len = mem::size_of::<nix::sys::socket::sockaddr_storage>();
SockAddrStorage { storage, len }
}

pub unsafe fn as_socket_addr(&self) -> io::Result<SockAddr> {
let storage = &*self.storage.as_ptr();
nix::sys::socket::sockaddr_storage_to_addr(storage, self.len).map_err(|e| to_io_error!(e))
}
}

#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
Expand Down
30 changes: 16 additions & 14 deletions glommio/src/net/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
//
use crate::{
sys::{self, DmaBuffer, Source, SourceType},
sys::{DmaBuffer, Source, SourceType},
ByteSliceMutExt, Reactor,
};
use nix::sys::socket::MsgFlags;
use nix::sys::socket::{MsgFlags, SockaddrLike};
use std::{
cell::Cell,
io,
Expand Down Expand Up @@ -95,10 +95,10 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
self.consume_receive_buffer(source, buf).await
}

pub(crate) async fn peek_from(
pub(crate) async fn peek_from<T: SockaddrLike>(
&self,
buf: &mut [u8],
) -> io::Result<(usize, nix::sys::socket::SockAddr)> {
) -> io::Result<(usize, T)> {
match self.yolo_recvmsg(buf, MsgFlags::MSG_PEEK) {
Some(res) => res,
None => self.recv_from_blocking(buf, MsgFlags::MSG_PEEK).await,
Expand All @@ -119,11 +119,11 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
}
}

pub(crate) async fn recv_from_blocking(
pub(crate) async fn recv_from_blocking<T: SockaddrLike>(
&self,
buf: &mut [u8],
flags: MsgFlags,
) -> io::Result<(usize, nix::sys::socket::SockAddr)> {
) -> io::Result<(usize, T)> {
let source = self.reactor.upgrade().unwrap().rushed_recvmsg(
self.socket.as_raw_fd(),
buf.len(),
Expand All @@ -136,7 +136,9 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
let mut src = src.take().unwrap();
src.trim_to_size(sz);
buf[0..sz].copy_from_slice(&src.as_bytes()[0..sz]);
let addr = unsafe { sys::ssptr_to_sockaddr(addr, hdr.msg_namelen as _)? };
let addr = unsafe {
T::from_raw(addr.as_ptr() as *const _, Some(hdr.msg_namelen)).unwrap()
};
self.rx_yolo.set(true);
Ok((sz, addr))
}
Expand Down Expand Up @@ -172,10 +174,10 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
self.read_timeout.get()
}

pub(crate) async fn recv_from(
pub(crate) async fn recv_from<T: SockaddrLike>(
&self,
buf: &mut [u8],
) -> io::Result<(usize, nix::sys::socket::SockAddr)> {
) -> io::Result<(usize, T)> {
match self.yolo_recvmsg(buf, MsgFlags::empty()) {
Some(res) => res,
None => self.recv_from_blocking(buf, MsgFlags::empty()).await,
Expand All @@ -185,7 +187,7 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
pub(crate) async fn send_to_blocking(
&self,
buf: &[u8],
sockaddr: nix::sys::socket::SockAddr,
sockaddr: impl nix::sys::socket::SockaddrLike,
) -> io::Result<usize> {
let mut dma = self.allocate_buffer(buf.len());
assert_eq!(dma.write_at(0, buf), buf.len());
Expand All @@ -203,7 +205,7 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
pub(crate) async fn send_to(
&self,
buf: &[u8],
addr: nix::sys::socket::SockAddr,
addr: impl nix::sys::socket::SockaddrLike,
) -> io::Result<usize> {
match self.yolo_sendmsg(buf, &addr) {
Some(res) => res,
Expand Down Expand Up @@ -245,11 +247,11 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
})
}

fn yolo_recvmsg(
fn yolo_recvmsg<T: SockaddrLike>(
&self,
buf: &mut [u8],
flags: MsgFlags,
) -> Option<io::Result<(usize, nix::sys::socket::SockAddr)>> {
) -> Option<io::Result<(usize, T)>> {
if self.rx_yolo.get() {
super::yolo_recvmsg(self.socket.as_raw_fd(), buf, flags)
} else {
Expand All @@ -276,7 +278,7 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
fn yolo_sendmsg(
&self,
buf: &[u8],
addr: &nix::sys::socket::SockAddr,
addr: &impl nix::sys::socket::SockaddrLike,
) -> Option<io::Result<usize>> {
if self.tx_yolo.get() {
super::yolo_sendmsg(self.socket.as_raw_fd(), buf, addr)
Expand Down
8 changes: 4 additions & 4 deletions glommio/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//
//! This module provides glommio's networking support.
use crate::sys;
use nix::sys::socket::MsgFlags;
use nix::sys::socket::{MsgFlags, SockaddrLike};
use std::{io, os::unix::io::RawFd};

fn yolo_accept(fd: RawFd) -> Option<io::Result<RawFd>> {
Expand Down Expand Up @@ -67,11 +67,11 @@ fn yolo_recv(fd: RawFd, buf: &mut [u8]) -> Option<io::Result<usize>> {
}
}

fn yolo_recvmsg(
fn yolo_recvmsg<T: SockaddrLike>(
fd: RawFd,
buf: &mut [u8],
flags: MsgFlags,
) -> Option<io::Result<(usize, nix::sys::socket::SockAddr)>> {
) -> Option<io::Result<(usize, T)>> {
match sys::recvmsg_syscall(
fd,
buf.as_mut_ptr(),
Expand All @@ -89,7 +89,7 @@ fn yolo_recvmsg(
fn yolo_sendmsg(
fd: RawFd,
buf: &[u8],
addr: &nix::sys::socket::SockAddr,
addr: &impl nix::sys::socket::SockaddrLike,
) -> Option<io::Result<usize>> {
match sys::sendmsg_syscall(
fd,
Expand Down
17 changes: 8 additions & 9 deletions glommio/src/net/tcp_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures_lite::{
io::{AsyncBufRead, AsyncRead, AsyncWrite},
stream::{self, Stream},
};
use nix::sys::socket::{InetAddr, SockAddr};
use nix::sys::socket::SockaddrStorage;
use pin_project_lite::pin_project;
use socket2::{Domain, Protocol, Socket, Type};
use std::{
Expand Down Expand Up @@ -393,16 +393,14 @@ impl FromRawFd for TcpStream {
}
}

fn make_tcp_socket(addr: &SocketAddr) -> io::Result<(SockAddr, Socket)> {
fn make_tcp_socket(addr: &SocketAddr) -> io::Result<Socket> {
let domain = if addr.is_ipv6() {
Domain::IPV6
} else {
Domain::IPV4
};
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
let inet = InetAddr::from_std(addr);
let addr = SockAddr::new_inet(inet);
Ok((addr, socket))
Ok(socket)
}

impl TcpStream {
Expand All @@ -420,9 +418,9 @@ impl TcpStream {
/// ```
pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<TcpStream> {
let addr = addr.to_socket_addrs()?.next().unwrap();
let (addr, socket) = make_tcp_socket(&addr)?;
let socket = make_tcp_socket(&addr)?;
let reactor = crate::executor().reactor();
let source = reactor.connect(socket.as_raw_fd(), addr);
let source = reactor.connect(socket.as_raw_fd(), SockaddrStorage::from(addr));
source.collect_rw().await?;

Ok(TcpStream {
Expand Down Expand Up @@ -463,9 +461,10 @@ impl TcpStream {
}

let addr = addr.to_socket_addrs()?.next().unwrap();
let (addr, socket) = make_tcp_socket(&addr)?;
let socket = make_tcp_socket(&addr)?;
let reactor = crate::executor().reactor();
let source = reactor.connect_timeout(socket.as_raw_fd(), addr, duration);
let source =
reactor.connect_timeout(socket.as_raw_fd(), SockaddrStorage::from(addr), duration);

// connect_timeout submits two sqes to io_uring: a connect sqe soft-linked
// with a LINK_TIMEOUT sqe. If the timeout fires, the connect sqe fails with
Expand Down
62 changes: 30 additions & 32 deletions glommio/src/net/udp_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
//
use super::datagram::GlommioDatagram;
use nix::sys::socket::{InetAddr, SockAddr};
use nix::sys::socket::{SockaddrLike, SockaddrStorage};
use socket2::{Domain, Protocol, Socket, Type};
use std::{
io,
net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs},
os::unix::io::{AsRawFd, FromRawFd, RawFd},
time::Duration,
};
Expand Down Expand Up @@ -42,6 +42,23 @@ impl FromRawFd for UdpSocket {
}
}

fn sockaddr_storage_to_std(addr: SockaddrStorage) -> Option<SocketAddr> {
match addr.family() {
Some(nix::sys::socket::AddressFamily::Inet) => addr
.as_sockaddr_in()
.map(|x| SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(x.ip()), x.port()))),
Some(nix::sys::socket::AddressFamily::Inet6) => addr.as_sockaddr_in6().map(|x| {
SocketAddr::V6(SocketAddrV6::new(
x.ip(),
x.port(),
x.flowinfo(),
x.scope_id(),
))
}),
_ => None,
}
}

impl UdpSocket {
/// Creates a UDP socket bound to the specified address.
///
Expand Down Expand Up @@ -121,10 +138,8 @@ impl UdpSocket {
let iter = addr.to_socket_addrs()?;
let mut err = io::Error::new(io::ErrorKind::Other, "No Valid addresses");
for addr in iter {
let inet = InetAddr::from_std(&addr);
let addr = SockAddr::new_inet(inet);
let reactor = self.socket.reactor.upgrade().unwrap();
let source = reactor.connect(self.socket.as_raw_fd(), addr);
let source = reactor.connect(self.socket.as_raw_fd(), SockaddrStorage::from(addr));
match source.collect_rw().await {
Ok(_) => return Ok(()),
Err(x) => {
Expand Down Expand Up @@ -494,12 +509,8 @@ impl UdpSocket {
/// supplied buffer, excess bytes may be discarded.
pub async fn peek_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
let (sz, addr) = self.socket.peek_from(buf).await?;

let addr = match addr {
nix::sys::socket::SockAddr::Inet(addr) => addr,
x => panic!("invalid socket addr for this family!: {:?}", x),
};
Ok((sz, addr.to_std()))
let addr = sockaddr_storage_to_std(addr).expect("invalid socket addr for this family!");
Ok((sz, addr))
}

/// Returns the socket address of the remote peer this socket was connected
Expand Down Expand Up @@ -581,11 +592,8 @@ impl UdpSocket {
/// ```
pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
let (sz, addr) = self.socket.recv_from(buf).await?;
let addr = match addr {
nix::sys::socket::SockAddr::Inet(addr) => addr,
x => panic!("invalid socket addr for this family!: {:?}", x),
};
Ok((sz, addr.to_std()))
let addr = sockaddr_storage_to_std(addr).expect("invalid socket addr for this family!");
Ok((sz, addr))
}

/// Sends data on the socket to the given address. On success, returns the
Expand Down Expand Up @@ -622,8 +630,7 @@ impl UdpSocket {
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?;

let inet = nix::sys::socket::InetAddr::from_std(&addr);
let sockaddr = nix::sys::socket::SockAddr::new_inet(inet);
let sockaddr = SockaddrStorage::from(addr);
self.socket.send_to(buf, sockaddr).await.map_err(Into::into)
}

Expand Down Expand Up @@ -659,7 +666,7 @@ impl UdpSocket {
mod tests {
use super::*;
use crate::{timer::Timer, LocalExecutorBuilder};
use nix::sys::socket::MsgFlags;
use nix::sys::socket::{MsgFlags, SockaddrIn};
use std::time::Duration;

macro_rules! connected_pair {
Expand Down Expand Up @@ -879,7 +886,7 @@ mod tests {
for _ in 0..10 {
let (sz, _) = receiver
.socket
.recv_from_blocking(&mut buf, MsgFlags::MSG_PEEK)
.recv_from_blocking::<SockaddrIn>(&mut buf, MsgFlags::MSG_PEEK)
.await
.unwrap();
assert_eq!(sz, 1);
Expand All @@ -889,11 +896,7 @@ mod tests {
.recv_from_blocking(&mut buf, MsgFlags::MSG_PEEK)
.await
.unwrap();
let addr = match from {
nix::sys::socket::SockAddr::Inet(addr) => addr,
x => panic!("invalid socket addr for this family!: {:?}", x),
};
addr.to_std()
sockaddr_storage_to_std(from).expect("invalid socket addr for this family!")
})
.detach();

Expand Down Expand Up @@ -940,11 +943,7 @@ mod tests {
.await
.unwrap();
assert_eq!(sz, 1);
let addr = match from {
nix::sys::socket::SockAddr::Inet(addr) => addr,
x => panic!("invalid socket addr for this family!: {:?}", x),
};
addr.to_std()
sockaddr_storage_to_std(from).expect("invalid socket addr for this family!")
})
.detach();

Expand Down Expand Up @@ -980,8 +979,7 @@ mod tests {
let receiver = UdpSocket::bind("127.0.0.1:0").unwrap();
let addr = receiver.local_addr().unwrap();

let inet = nix::sys::socket::InetAddr::from_std(&addr);
let sockaddr = nix::sys::socket::SockAddr::new_inet(inet);
let sockaddr = SockaddrStorage::from(addr);
let me = UdpSocket::bind("127.0.0.1:0").unwrap();
me.socket
.send_to_blocking(&[65u8; 1], sockaddr)
Expand Down
Loading

0 comments on commit a14a826

Please sign in to comment.