Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No recvmsg sendmsg #2032

Merged
merged 4 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,8 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
for (unsigned i = 0; i < aio->a_nio; i++) {
aio->a_iov[i] = aio->a_iov[i + 1];
}
aio->a_iov[aio->a_nio].iov_buf = NULL; // serves as indicator
aio->a_iov[aio->a_nio].iov_len = 0; // serves as indicator
}
return (residual); // we might not have used all of n for this iov
}
Expand Down
4 changes: 3 additions & 1 deletion src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ extern void nni_aio_sys_fini(void);

typedef struct nni_aio_expire_q nni_aio_expire_q;

#define NNI_AIO_MAX_IOV 8

// nng_aio is an async I/O handle. The details of this aio structure
// are private to the AIO framework. The structure has the public name
// (nng_aio) so that we minimize the pollution in the public API namespace.
Expand All @@ -216,7 +218,7 @@ struct nng_aio {
nni_task a_task;

// Read/write operations.
nni_iov a_iov[8];
nni_iov a_iov[NNI_AIO_MAX_IOV];
unsigned a_nio;

// Message operations.
Expand Down
18 changes: 18 additions & 0 deletions src/core/aio_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,23 @@ test_aio_busy(void)
NUTS_ASSERT(!nng_aio_busy(aio));
nng_aio_free(aio);
}
void
test_aio_scatter_gather_too_many(void)
{
nng_aio *aio;
nng_iov iov[32];

NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));

for (int i = 0; i < 32; i++) {
iov[i].iov_buf = "abc";
iov[i].iov_len = 3;
}

NUTS_FAIL(nng_aio_set_iov(aio, 32, iov), NNG_EINVAL);

nng_aio_free(aio);
}

NUTS_TESTS = {
{ "sleep", test_sleep },
Expand All @@ -453,5 +470,6 @@ NUTS_TESTS = {
{ "sleep loop", test_sleep_loop },
{ "sleep cancel", test_sleep_cancel },
{ "aio busy", test_aio_busy },
{ "scatter gather too many", test_aio_scatter_gather_too_many },
{ NULL, NULL },
};
2 changes: 2 additions & 0 deletions src/platform/posix/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ if (NNG_PLATFORM_POSIX)
nng_check_func(getentropy NNG_HAVE_GETENTROPY)
nng_check_func(getrandom NNG_HAVE_GETRANDOM)
nng_check_func(arc4random_buf NNG_HAVE_ARC4RANDOM)
nng_check_func(recvmsg NNG_HAVE_RECVMSG)
nng_check_func(sendmsg NNG_HAVE_SENDMSG)

nng_check_func(clock_gettime NNG_HAVE_CLOCK_GETTIME_LIBC)
if (NNG_HAVE_CLOCK_GETTIME_LIBC)
Expand Down
29 changes: 15 additions & 14 deletions src/platform/posix/posix_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,18 @@ ipc_dowrite(ipc_conn *c)
}

while ((aio = nni_list_first(&c->writeq)) != NULL) {
unsigned i;
int n;
int niov;
unsigned naiov;
nni_iov *aiov;
struct msghdr hdr;
struct iovec iovec[16];
int n;
unsigned naiov;
nni_iov *aiov;

memset(&hdr, 0, sizeof(hdr));
nni_aio_get_iov(aio, &naiov, &aiov);
NNI_ASSERT(naiov <= NNI_AIO_MAX_IOV);

if (naiov > NNI_NUM_ELEMENTS(iovec)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_EINVAL);
continue;
}
#ifdef NNG_HAVE_SENDMSG
struct msghdr hdr = { 0 };
struct iovec iovec[NNI_AIO_MAX_IOV];
int niov;
unsigned i;

for (niov = 0, i = 0; i < naiov; i++) {
if (aiov[i].iov_len > 0) {
Expand All @@ -67,7 +63,12 @@ ipc_dowrite(ipc_conn *c)
hdr.msg_iovlen = niov;
hdr.msg_iov = iovec;

if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
n = sendmsg(fd, &hdr, MSG_NOSIGNAL);
#else
// We have to send a bit at a time.
n = send(fd, aiov[0].iov_buf, aiov[0].iov_len, MSG_NOSIGNAL);
#endif
if (n < 0) {
switch (errno) {
case EINTR:
continue;
Expand Down
31 changes: 15 additions & 16 deletions src/platform/posix/posix_tcpconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,19 @@ tcp_dowrite(nni_tcp_conn *c)
}

while ((aio = nni_list_first(&c->writeq)) != NULL) {
unsigned i;
int n;
int niov;
unsigned naiov;
nni_iov *aiov;
struct msghdr hdr;
struct iovec iovec[16];
int n;
unsigned naiov;
nni_iov *aiov;

memset(&hdr, 0, sizeof(hdr));
nni_aio_get_iov(aio, &naiov, &aiov);

if (naiov > NNI_NUM_ELEMENTS(iovec)) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_EINVAL);
continue;
}
NNI_ASSERT(naiov <= NNI_AIO_MAX_IOV);

#ifdef NNG_HAVE_SENDMSG
struct msghdr hdr = { 0 };
struct iovec iovec[NNI_AIO_MAX_IOV];
int niov;
unsigned i;
for (niov = 0, i = 0; i < naiov; i++) {
if (aiov[i].iov_len > 0) {
iovec[niov].iov_len = aiov[i].iov_len;
Expand All @@ -67,7 +63,12 @@ tcp_dowrite(nni_tcp_conn *c)
hdr.msg_iovlen = niov;
hdr.msg_iov = iovec;

if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
n = sendmsg(fd, &hdr, MSG_NOSIGNAL);
#else
// We have to send a bit at a time.
n = send(fd, aiov[0].iov_buf, aiov[0].iov_len, MSG_NOSIGNAL);
#endif
if (n < 0) {
switch (errno) {
case EINTR:
continue;
Expand All @@ -87,8 +88,6 @@ tcp_dowrite(nni_tcp_conn *c)
}

nni_aio_bump_count(aio, n);
// We completed the entire operation on this aio.
// (Sendmsg never returns a partial result.)
nni_aio_list_remove(aio);
nni_aio_finish(aio, 0, nni_aio_count(aio));

Expand Down
159 changes: 127 additions & 32 deletions src/platform/posix/posix_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,74 @@
nni_posix_udp_doerror(udp, NNG_ECLOSED);
}

// If we need to do sendmsg or recvmsg, then we do this fairly
// awful thing to do a "pull" up. It is important that in such a
// case we must have only a single poller, because we have only
// this single buffer. Performance will be worse, as data copies
// are involved.
#if !defined(NNG_HAVE_RECVMSG) || !defined(NNG_HAVE_SENDMSG)
static uint8_t bouncebuf[65536];
#endif

#if !defined(NNG_HAVE_RECVMSG)
static int
copy_to_bounce(nng_iov *iov, int niov)
{
int room = sizeof(bouncebuf);
uint8_t *buf = bouncebuf;
int len = 0;

for (int i = 0; i < niov && room; i++) {
int n = iov[i].iov_len;
if (n > room) {
n = room;
}
memcpy(buf, iov[i].iov_buf, n);
room -= n;
buf += n;
len += n;
}
return (len);
}
#endif

#if !defined(NNG_HAVE_SENDMSG)
static void
copy_from_bounce(nng_iov *iov, int niov, int len)
{
uint8_t *buf = bouncebuf;
for (int i = 0; i < niov && len; i++) {
int n = iov[i].iov_len;
if (n > len) {
n = len;
}
memcpy(iov[i].iov_buf, buf, n);
len -= n;
buf += n;
}
}
#endif

static void
nni_posix_udp_dorecv(nni_plat_udp *udp)
{
nni_aio *aio;
nni_list *q = &udp->udp_recvq;
// While we're able to recv, do so.
while ((aio = nni_list_first(q)) != NULL) {
struct iovec iov[4];
unsigned niov;
nni_iov *aiov;
nng_iov *aiov;
struct sockaddr_storage ss;
nng_sockaddr *sa;
struct msghdr hdr = { .msg_name = NULL };
int rv = 0;
int cnt = 0;

nni_aio_get_iov(aio, &niov, &aiov);
NNI_ASSERT(niov <= NNI_AIO_MAX_IOV);

#ifdef NNG_HAVE_RECVMSG
struct iovec iov[NNI_AIO_MAX_IOV];
struct msghdr hdr = { .msg_name = NULL };

for (unsigned i = 0; i < niov; i++) {
iov[i].iov_base = aiov[i].iov_buf;
Expand All @@ -119,6 +170,32 @@
nni_posix_sockaddr2nn(
sa, (void *) &ss, hdr.msg_namelen);
}
#else // !NNG_HAVE_RECVMSG
// Here we have to use a bounce buffer
uint8_t *buf;
size_t len;
socklen_t salen;
if (niov == 1) {
buf = aiov[0].iov_buf;
len = aiov[0].iov_len;
} else {
buf = bouncebuf;
len = sizeof(bouncebuf);
}
salen = sizeof(ss);
if ((cnt = recvfrom(udp->udp_fd, buf, len, 0, (void *) &ss,
&salen)) < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return;
}
rv = nni_plat_errno(errno);
} else if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
nni_posix_sockaddr2nn(sa, (void *) &ss, salen);
}
if (niov != 1) {
copy_from_bounce(aiov, niov, cnt);
}
#endif
nni_list_remove(q, aio);
nni_aio_finish(aio, rv, cnt);
}
Expand All @@ -134,43 +211,61 @@
while ((aio = nni_list_first(q)) != NULL) {
struct sockaddr_storage ss;

int len;
int rv = 0;
int cnt = 0;
int salen;
int rv = 0;
int cnt = 0;
unsigned niov;
nni_iov *aiov;

len = nni_posix_nn2sockaddr(&ss, nni_aio_get_input(aio, 0));
if (len < 1) {
nni_aio_get_iov(aio, &niov, &aiov);
NNI_ASSERT(niov <= NNI_AIO_MAX_IOV);
if ((salen = nni_posix_nn2sockaddr(
&ss, nni_aio_get_input(aio, 0))) < 1) {
rv = NNG_EADDRINVAL;
} else {
unsigned niov;
nni_iov *aiov;
struct iovec iov[16];
#ifdef NNG_HAVE_SENDMSG

nni_aio_get_iov(aio, &niov, &aiov);
if (niov > NNI_NUM_ELEMENTS(iov)) {
rv = NNG_EINVAL;
struct iovec iov[NNI_AIO_MAX_IOV];
struct msghdr hdr = { .msg_name = NULL };
for (unsigned i = 0; i < niov; i++) {
iov[i].iov_base = aiov[i].iov_buf;
iov[i].iov_len = aiov[i].iov_len;
}
if (rv == 0) {
struct msghdr hdr = { .msg_name = NULL };
for (unsigned i = 0; i < niov; i++) {
iov[i].iov_base = aiov[i].iov_buf;
iov[i].iov_len = aiov[i].iov_len;
hdr.msg_iov = iov;
hdr.msg_iovlen = niov;
hdr.msg_name = &ss;
hdr.msg_namelen = salen;

cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL);
if (cnt < 0) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
// Cannot send now, leave.
return;

Check warning on line 244 in src/platform/posix/posix_udp.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_udp.c#L244

Added line #L244 was not covered by tests
}
hdr.msg_iov = iov;
hdr.msg_iovlen = niov;
hdr.msg_name = &ss;
hdr.msg_namelen = len;

cnt = sendmsg(udp->udp_fd, &hdr, MSG_NOSIGNAL);
if (cnt < 0) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
// Cannot send now, leave.
return;
}
rv = nni_plat_errno(errno);
rv = nni_plat_errno(errno);
}
#else // !NNG_HAVE_SENDMSG
uint8_t *buf;
size_t len;
if (niov == 1) {
buf = aiov[0].iov_buf;
len = aiov[0].iov_len;
} else {
len = copy_to_bounce(aiov, niov);
buf = bouncebuf;
}
cnt = sendto(
udp->udp_fd, buf, len, 0, (void *) &ss, salen);
if (cnt < 0) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
// Cannot send now, leave.
return;
}
rv = nni_plat_errno(errno);
}
#endif
}

nni_list_remove(q, aio);
Expand Down
Loading
Loading