diff --git a/src/core/aio.c b/src/core/aio.c index e849b33dc..3d4a56c19 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -115,7 +115,7 @@ void nni_aio_fini(nni_aio *aio) { nni_aio_cancel_fn fn; - void *arg; + void *arg; nni_aio_expire_q *eq = aio->a_expire_q; // This is like aio_close, but we don't want to dispatch @@ -247,7 +247,21 @@ nni_aio_close(nni_aio *aio) void nni_aio_set_timeout(nni_aio *aio, nni_duration when) { - aio->a_timeout = when; + aio->a_timeout = when; + aio->a_use_expire = false; +} + +void +nni_aio_set_expire(nni_aio *aio, nni_time expire) +{ + aio->a_expire = expire; + aio->a_use_expire = true; +} + +nng_duration +nni_aio_get_timeout(nni_aio *aio) +{ + return (aio->a_timeout); } void @@ -369,7 +383,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) { nni_aio_expire_q *eq = aio->a_expire_q; - if (!aio->a_sleep) { + if ((!aio->a_sleep) && (!aio->a_use_expire)) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: @@ -411,7 +425,7 @@ void nni_aio_abort(nni_aio *aio, int rv) { nni_aio_cancel_fn fn; - void *arg; + void *arg; nni_aio_expire_q *eq = aio->a_expire_q; nni_mtx_lock(&eq->eq_mtx); @@ -447,8 +461,9 @@ nni_aio_finish_impl( aio->a_msg = msg; } - aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; + aio->a_expire = NNI_TIME_NEVER; + aio->a_sleep = false; + aio->a_use_expire = false; nni_mtx_unlock(&eq->eq_mtx); if (sync) { @@ -518,13 +533,14 @@ nni_aio_completions_init(nni_aio_completions *clp) } void -nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count) +nni_aio_completions_add( + nni_aio_completions *clp, nni_aio *aio, int result, size_t count) { NNI_ASSERT(!nni_aio_list_active(aio)); aio->a_reap_node.rn_next = *clp; - aio->a_result = result; - aio->a_count = count; - *clp = aio; + aio->a_result = result; + aio->a_count = count; + *clp = aio; } void @@ -532,10 +548,10 @@ nni_aio_completions_run(nni_aio_completions *clp) { nni_aio *aio; nni_aio *cl = *clp; - *clp = NULL; + *clp = NULL; while ((aio = cl) != NULL) { - cl = (void *)aio->a_reap_node.rn_next; + cl = (void *) aio->a_reap_node.rn_next; aio->a_reap_node.rn_next = NULL; nni_aio_finish_sync(aio, aio->a_result, aio->a_count); } diff --git a/src/core/aio.h b/src/core/aio.h index a2ebf70a9..cae8610f8 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -149,10 +149,12 @@ extern size_t nni_aio_iov_count(nni_aio *); extern int nni_aio_set_iov(nni_aio *, unsigned, const nni_iov *); -extern void nni_aio_set_timeout(nni_aio *, nng_duration); -extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **); -extern void nni_aio_normalize_timeout(nni_aio *, nng_duration); -extern void nni_aio_bump_count(nni_aio *, size_t); +extern void nni_aio_set_timeout(nni_aio *, nng_duration); +extern void nni_aio_set_expire(nni_aio *, nni_time); +extern nng_duration nni_aio_get_timeout(nni_aio *); +extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **); +extern void nni_aio_normalize_timeout(nni_aio *, nng_duration); +extern void nni_aio_bump_count(nni_aio *, size_t); // nni_aio_schedule indicates that the AIO has begun, and is scheduled for // asynchronous completion. This also starts the expiration timer. Note that @@ -187,8 +189,8 @@ extern void nni_aio_completions_run(nni_aio_completions *); // nni_aio_completions_add adds an aio (with the result code and length as // appropriate) to the completion list. This should be done while the // appropriate lock is held. The aio must not be scheduled. -extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *, - int, size_t); +extern void nni_aio_completions_add( + nni_aio_completions *, nni_aio *, int, size_t); extern int nni_aio_sys_init(void); extern void nni_aio_sys_fini(void); @@ -202,14 +204,15 @@ typedef struct nni_aio_expire_q nni_aio_expire_q; // any of these members -- the definition is provided here to facilitate // inlining, but that should be the only use. struct nng_aio { - size_t a_count; // Bytes transferred (I/O only) - nni_time a_expire; // Absolute timeout - nni_duration a_timeout; // Relative timeout - int a_result; // Result code (nng_errno) - bool a_stop; // Shutting down (no new operations) - bool a_sleep; // Sleeping with no action - bool a_expire_ok; // Expire from sleep is ok - bool a_expiring; // Expiration in progress + size_t a_count; // Bytes transferred (I/O only) + nni_time a_expire; // Absolute timeout + nni_duration a_timeout; // Relative timeout + int a_result; // Result code (nng_errno) + bool a_stop; // Shutting down (no new operations) + bool a_sleep; // Sleeping with no action + bool a_expire_ok; // Expire from sleep is ok + bool a_expiring; // Expiration in progress + bool a_use_expire; // Use expire instead of timeout nni_task a_task; // Read/write operations. @@ -227,8 +230,8 @@ struct nng_aio { // Provider-use fields. nni_aio_cancel_fn a_cancel_fn; - void *a_cancel_arg; - void *a_prov_data; + void *a_cancel_arg; + void *a_prov_data; nni_list_node a_prov_node; // Linkage on provider list. nni_aio_expire_q *a_expire_q; nni_list_node a_expire_node; // Expiration node diff --git a/src/sp/protocol/survey0/survey.c b/src/sp/protocol/survey0/survey.c index 5c52d8f8e..18074016e 100644 --- a/src/sp/protocol/survey0/survey.c +++ b/src/sp/protocol/survey0/survey.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -25,17 +25,15 @@ typedef struct surv0_ctx surv0_ctx; static void surv0_pipe_send_cb(void *); static void surv0_pipe_recv_cb(void *); -static void surv0_ctx_timeout(void *); struct surv0_ctx { surv0_sock * sock; uint32_t survey_id; // survey id - nni_timer_node timer; - nni_time expire; nni_lmq recv_lmq; nni_list recv_queue; nni_atomic_int recv_buf; nni_atomic_int survey_time; + nni_time expire; int err; }; @@ -99,7 +97,6 @@ surv0_ctx_fini(void *arg) surv0_ctx *ctx = arg; surv0_ctx_close(ctx); - nni_timer_cancel(&ctx->timer); nni_lmq_fini(&ctx->recv_lmq); } @@ -129,7 +126,6 @@ surv0_ctx_init(void *c, void *s) ctx->sock = sock; nni_lmq_init(&ctx->recv_lmq, len); - nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx); } static void @@ -155,17 +151,28 @@ surv0_ctx_recv(void *arg, nni_aio *aio) surv0_ctx * ctx = arg; surv0_sock *sock = ctx->sock; nni_msg * msg; + nni_time now; + nni_duration timeout; if (nni_aio_begin(aio) != 0) { return; } + now = nni_clock(); + nni_mtx_lock(&sock->mtx); - if (ctx->survey_id == 0) { + if ((ctx->survey_id == 0) || (now >= ctx->expire)) { nni_mtx_unlock(&sock->mtx); nni_aio_finish_error(aio, NNG_ESTATE); return; } + + timeout = nni_aio_get_timeout(aio); + if ((timeout < 1) || ((now + timeout) > ctx->expire)) { + // limit the timeout to the survey time + nni_aio_set_expire(aio, ctx->expire); + } + again: if (nni_lmq_get(&ctx->recv_lmq, &msg) != 0) { int rv; @@ -190,23 +197,6 @@ surv0_ctx_recv(void *arg, nni_aio *aio) nni_aio_finish_msg(aio, msg); } -void -surv0_ctx_timeout(void *arg) -{ - surv0_ctx * ctx = arg; - surv0_sock *sock = ctx->sock; - - nni_mtx_lock(&sock->mtx); - if (nni_clock() < ctx->expire) { - nni_mtx_unlock(&sock->mtx); - return; - } - - // Abort any pending receives. - surv0_ctx_abort(ctx, NNG_ETIMEDOUT); - nni_mtx_unlock(&sock->mtx); -} - static void surv0_ctx_send(void *arg, nni_aio *aio) { @@ -215,7 +205,6 @@ surv0_ctx_send(void *arg, nni_aio *aio) surv0_pipe * pipe; nni_msg * msg = nni_aio_get_msg(aio); size_t len = nni_msg_len(msg); - nni_time now = nni_clock(); nng_duration survey_time; int rv; @@ -229,7 +218,6 @@ surv0_ctx_send(void *arg, nni_aio *aio) // Abort everything outstanding. surv0_ctx_abort(ctx, NNG_ECANCELED); - nni_timer_cancel(&ctx->timer); // Allocate the new ID. if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) { @@ -258,8 +246,9 @@ surv0_ctx_send(void *arg, nni_aio *aio) } } - ctx->expire = now + survey_time; - nni_timer_schedule(&ctx->timer, ctx->expire); + // save the survey time, so we know the maximum timeout to use when + // waiting for receive + ctx->expire = nni_clock() + survey_time; nni_mtx_unlock(&sock->mtx); nni_msg_free(msg);