diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c index 49658f3bf..2e3debad7 100644 --- a/src/platform/posix/posix_pollq_port.c +++ b/src/platform/posix/posix_pollq_port.c @@ -30,6 +30,8 @@ typedef struct nni_posix_pollq { int port; // port id (from port_create) nni_thr thr; // worker thread + nni_mtx mtx; + nni_cv cv; } nni_posix_pollq; // single global instance for now @@ -45,15 +47,13 @@ nni_posix_pfd_init(nni_posix_pfd *pfdp, int fd, nni_posix_pfd_cb cb, void *arg) (void) fcntl(fd, F_SETFD, FD_CLOEXEC); (void) fcntl(fd, F_SETFL, O_NONBLOCK); - nni_mtx_init(&pfd->mtx); - nni_cv_init(&pfd->cv, &pfd->mtx); - pfd->closed = false; - pfd->closing = false; - pfd->fd = fd; - pfd->pq = pq; - pfd->cb = cb; - pfd->arg = arg; - pfd->data = NULL; + nni_atomic_init(&pfd->events); + pfd->closed = false; + pfd->fd = fd; + pfd->pq = pq; + pfd->cb = cb; + pfd->arg = arg; + pfd->data = NULL; } int @@ -71,13 +71,8 @@ nni_posix_pfd_close(nni_posix_pfd *pfd) return; } - nni_mtx_lock(&pfd->mtx); - if (!pfd->closing) { - pfd->closing = true; - (void) shutdown(pfd->fd, SHUT_RDWR); - port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd); - } - nni_mtx_unlock(&pfd->mtx); + (void) shutdown(pfd->fd, SHUT_RDWR); + port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd); // Send the wake event to the poller to synchronize with it. // Note that port_send should only really fail if out of memory @@ -102,11 +97,11 @@ nni_posix_pfd_stop(nni_posix_pfd *pfd) } sched_yield(); // try again later... } - nni_mtx_lock(&pfd->mtx); + nni_mtx_lock(&pq->mtx); while (!pfd->closed) { - nni_cv_wait(&pfd->cv); + nni_cv_wait(&pq->cv); } - nni_mtx_unlock(&pfd->mtx); + nni_mtx_unlock(&pq->mtx); } void @@ -122,27 +117,21 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) // We're exclusive now. (void) close(pfd->fd); - nni_cv_fini(&pfd->cv); - nni_mtx_fini(&pfd->mtx); } int nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) { nni_posix_pollq *pq = pfd->pq; + int rv; + int ev = (int) events; - nni_mtx_lock(&pfd->mtx); - if (!pfd->closing) { - pfd->events |= events; - if (port_associate(pq->port, PORT_SOURCE_FD, pfd->fd, - (int) pfd->events, pfd) != 0) { - int rv = nni_plat_errno(errno); - nni_mtx_unlock(&pfd->mtx); - return (rv); - } + ev |= ni_atomic_or(&pfd->events, ev); + rv = port_associate(pq->port, PORT_SOURCE_FD, pfd->fd, ev, pfd); + if (rv != 0) { + nni_plat_errno(errno); } - nni_mtx_unlock(&pfd->mtx); - return (0); + return (rv); } static void @@ -152,7 +141,7 @@ nni_posix_poll_thr(void *arg) nni_posix_pollq *pq = arg; port_event_t ev[NNI_MAX_PORTEV]; nni_posix_pfd *pfd; - unsigned events; + int events; nni_posix_pfd_cb cb; void *arg; unsigned n; @@ -168,40 +157,35 @@ nni_posix_poll_thr(void *arg) // We run through the returned ports twice. First we // get the callbacks. Then we do the reaps. This way // we ensure that we only reap *after* callbacks have run. + bool user_wake = false; for (unsigned i = 0; i < n; i++) { - if (ev[i].portev_source != PORT_SOURCE_FD) { + switch (ev[i].portev_source) { + case PORT_SOURCE_USER: + user_wake = true; continue; - } - pfd = ev[i].portev_user; - events = ev[i].portev_events; - - nni_mtx_lock(&pfd->mtx); - cb = pfd->cb; - arg = pfd->data; - pfd->events &= ~events; - nni_mtx_unlock(&pfd->mtx); - - if (cb != NULL) { - cb(pfd, events, arg); + case PORT_SOURCE_FD: + if (ev[i].portev_source != PORT_SOURCE_FD) { + continue; + } + pfd = ev[i].portev_user; + events = ev[i].portev_events; + + cb = pfd->cb; + arg = pfd->data; + nni_atomic_and(&pfd->events, ~events); + + cb(pfd, (unsigned) events, arg); } } - for (unsigned i = 0; i < n; i++) { - if (ev[i].portev_source != PORT_SOURCE_USER) { - continue; + if (user_wake) { + nni_mtx_lock(&pq->mtx); + for (unsigned i = 0; i < n; i++) { + if (ev[i].portev_source == PORT_SOURCE_USER) { + pfd->closed = true; + } } - - // User event telling us to stop doing things. - // We signal back to use this as a coordination - // event between the pollq and the thread - // handler. NOTE: It is absolutely critical - // that there is only a single thread per - // pollq. Otherwise we cannot be sure that we - // are blocked completely, - pfd = ev[i].portev_user; - nni_mtx_lock(&pfd->mtx); - pfd->closed = true; - nni_cv_wake(&pfd->cv); - nni_mtx_unlock(&pfd->mtx); + nni_cv_wake(&pq->cv); + nni_mtx_unlock(&pq->mtx); } } } @@ -210,6 +194,8 @@ static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { (void) close(pq->port); + nni_cv_destroy(&pq->cv); + nni_mtx_fini(&pq->mtx); nni_thr_fini(&pq->thr); } @@ -228,6 +214,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq) } nni_thr_set_name(&pq->thr, "nng:poll:port"); + nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, pq->mtx); nni_thr_run(&pq->thr); return (0); } diff --git a/src/platform/posix/posix_pollq_port.h b/src/platform/posix/posix_pollq_port.h index cbeab6940..d8771527a 100644 --- a/src/platform/posix/posix_pollq_port.h +++ b/src/platform/posix/posix_pollq_port.h @@ -20,11 +20,8 @@ typedef struct nni_posix_pollq nni_posix_pollq; struct nni_posix_pfd { nni_posix_pollq *pq; int fd; - nni_mtx mtx; - nni_cv cv; - unsigned events; + nni_atomic_int events; bool closed; - bool closing; nni_posix_pfd_cb cb; void *data; };