Skip to content

Commit

Permalink
fixes #1728 surveyor could be simplified to not use timer
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Dec 17, 2023
1 parent 9f34ec0 commit dc49988
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 56 deletions.
40 changes: 28 additions & 12 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
void *arg;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;

// This is like aio_close, but we don't want to dispatch
Expand Down Expand Up @@ -247,7 +247,21 @@ nni_aio_close(nni_aio *aio)
void
nni_aio_set_timeout(nni_aio *aio, nni_duration when)
{
aio->a_timeout = when;
aio->a_timeout = when;
aio->a_use_expire = false;
}

void
nni_aio_set_expire(nni_aio *aio, nni_time expire)
{
aio->a_expire = expire;
aio->a_use_expire = true;
}

nng_duration
nni_aio_get_timeout(nni_aio *aio)
{
return (aio->a_timeout);
}

void
Expand Down Expand Up @@ -369,7 +383,7 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
nni_aio_expire_q *eq = aio->a_expire_q;

if (!aio->a_sleep) {
if ((!aio->a_sleep) && (!aio->a_use_expire)) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
Expand Down Expand Up @@ -411,7 +425,7 @@ void
nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
void *arg;
void *arg;
nni_aio_expire_q *eq = aio->a_expire_q;

nni_mtx_lock(&eq->eq_mtx);
Expand Down Expand Up @@ -447,8 +461,9 @@ nni_aio_finish_impl(
aio->a_msg = msg;
}

aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_use_expire = false;
nni_mtx_unlock(&eq->eq_mtx);

if (sync) {
Expand Down Expand Up @@ -518,24 +533,25 @@ nni_aio_completions_init(nni_aio_completions *clp)
}

void
nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count)
nni_aio_completions_add(
nni_aio_completions *clp, nni_aio *aio, int result, size_t count)
{
NNI_ASSERT(!nni_aio_list_active(aio));
aio->a_reap_node.rn_next = *clp;
aio->a_result = result;
aio->a_count = count;
*clp = aio;
aio->a_result = result;
aio->a_count = count;
*clp = aio;
}

void
nni_aio_completions_run(nni_aio_completions *clp)
{
nni_aio *aio;
nni_aio *cl = *clp;
*clp = NULL;
*clp = NULL;

while ((aio = cl) != NULL) {
cl = (void *)aio->a_reap_node.rn_next;
cl = (void *) aio->a_reap_node.rn_next;
aio->a_reap_node.rn_next = NULL;
nni_aio_finish_sync(aio, aio->a_result, aio->a_count);
}
Expand Down
35 changes: 19 additions & 16 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ extern size_t nni_aio_iov_count(nni_aio *);

extern int nni_aio_set_iov(nni_aio *, unsigned, const nni_iov *);

extern void nni_aio_set_timeout(nni_aio *, nng_duration);
extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
extern void nni_aio_bump_count(nni_aio *, size_t);
extern void nni_aio_set_timeout(nni_aio *, nng_duration);
extern void nni_aio_set_expire(nni_aio *, nni_time);
extern nng_duration nni_aio_get_timeout(nni_aio *);
extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
extern void nni_aio_bump_count(nni_aio *, size_t);

// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
// asynchronous completion. This also starts the expiration timer. Note that
Expand Down Expand Up @@ -187,8 +189,8 @@ extern void nni_aio_completions_run(nni_aio_completions *);
// nni_aio_completions_add adds an aio (with the result code and length as
// appropriate) to the completion list. This should be done while the
// appropriate lock is held. The aio must not be scheduled.
extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *,
int, size_t);
extern void nni_aio_completions_add(
nni_aio_completions *, nni_aio *, int, size_t);

extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);
Expand All @@ -202,14 +204,15 @@ typedef struct nni_aio_expire_q nni_aio_expire_q;
// any of these members -- the definition is provided here to facilitate
// inlining, but that should be the only use.
struct nng_aio {
size_t a_count; // Bytes transferred (I/O only)
nni_time a_expire; // Absolute timeout
nni_duration a_timeout; // Relative timeout
int a_result; // Result code (nng_errno)
bool a_stop; // Shutting down (no new operations)
bool a_sleep; // Sleeping with no action
bool a_expire_ok; // Expire from sleep is ok
bool a_expiring; // Expiration in progress
size_t a_count; // Bytes transferred (I/O only)
nni_time a_expire; // Absolute timeout
nni_duration a_timeout; // Relative timeout
int a_result; // Result code (nng_errno)
bool a_stop; // Shutting down (no new operations)
bool a_sleep; // Sleeping with no action
bool a_expire_ok; // Expire from sleep is ok
bool a_expiring; // Expiration in progress
bool a_use_expire; // Use expire instead of timeout
nni_task a_task;

// Read/write operations.
Expand All @@ -227,8 +230,8 @@ struct nng_aio {

// Provider-use fields.
nni_aio_cancel_fn a_cancel_fn;
void *a_cancel_arg;
void *a_prov_data;
void *a_cancel_arg;
void *a_prov_data;
nni_list_node a_prov_node; // Linkage on provider list.
nni_aio_expire_q *a_expire_q;
nni_list_node a_expire_node; // Expiration node
Expand Down
45 changes: 17 additions & 28 deletions src/sp/protocol/survey0/survey.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <[email protected]>
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand All @@ -25,17 +25,15 @@ typedef struct surv0_ctx surv0_ctx;

static void surv0_pipe_send_cb(void *);
static void surv0_pipe_recv_cb(void *);
static void surv0_ctx_timeout(void *);

struct surv0_ctx {
surv0_sock * sock;
uint32_t survey_id; // survey id
nni_timer_node timer;
nni_time expire;
nni_lmq recv_lmq;
nni_list recv_queue;
nni_atomic_int recv_buf;
nni_atomic_int survey_time;
nni_time expire;
int err;
};

Expand Down Expand Up @@ -99,7 +97,6 @@ surv0_ctx_fini(void *arg)
surv0_ctx *ctx = arg;

surv0_ctx_close(ctx);
nni_timer_cancel(&ctx->timer);
nni_lmq_fini(&ctx->recv_lmq);
}

Expand Down Expand Up @@ -129,7 +126,6 @@ surv0_ctx_init(void *c, void *s)
ctx->sock = sock;

nni_lmq_init(&ctx->recv_lmq, len);
nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx);
}

static void
Expand All @@ -155,17 +151,28 @@ surv0_ctx_recv(void *arg, nni_aio *aio)
surv0_ctx * ctx = arg;
surv0_sock *sock = ctx->sock;
nni_msg * msg;
nni_time now;
nni_duration timeout;

if (nni_aio_begin(aio) != 0) {
return;
}

now = nni_clock();

nni_mtx_lock(&sock->mtx);
if (ctx->survey_id == 0) {
if ((ctx->survey_id == 0) || (now >= ctx->expire)) {
nni_mtx_unlock(&sock->mtx);
nni_aio_finish_error(aio, NNG_ESTATE);
return;
}

timeout = nni_aio_get_timeout(aio);
if ((timeout < 1) || ((now + timeout) > ctx->expire)) {
// limit the timeout to the survey time
nni_aio_set_expire(aio, ctx->expire);
}

again:
if (nni_lmq_get(&ctx->recv_lmq, &msg) != 0) {
int rv;
Expand All @@ -190,23 +197,6 @@ surv0_ctx_recv(void *arg, nni_aio *aio)
nni_aio_finish_msg(aio, msg);
}

void
surv0_ctx_timeout(void *arg)
{
surv0_ctx * ctx = arg;
surv0_sock *sock = ctx->sock;

nni_mtx_lock(&sock->mtx);
if (nni_clock() < ctx->expire) {
nni_mtx_unlock(&sock->mtx);
return;
}

// Abort any pending receives.
surv0_ctx_abort(ctx, NNG_ETIMEDOUT);
nni_mtx_unlock(&sock->mtx);
}

static void
surv0_ctx_send(void *arg, nni_aio *aio)
{
Expand All @@ -215,7 +205,6 @@ surv0_ctx_send(void *arg, nni_aio *aio)
surv0_pipe * pipe;
nni_msg * msg = nni_aio_get_msg(aio);
size_t len = nni_msg_len(msg);
nni_time now = nni_clock();
nng_duration survey_time;
int rv;

Expand All @@ -229,7 +218,6 @@ surv0_ctx_send(void *arg, nni_aio *aio)

// Abort everything outstanding.
surv0_ctx_abort(ctx, NNG_ECANCELED);
nni_timer_cancel(&ctx->timer);

// Allocate the new ID.
if ((rv = nni_id_alloc(&sock->surveys, &ctx->survey_id, ctx)) != 0) {
Expand Down Expand Up @@ -258,8 +246,9 @@ surv0_ctx_send(void *arg, nni_aio *aio)
}
}

ctx->expire = now + survey_time;
nni_timer_schedule(&ctx->timer, ctx->expire);
// save the survey time, so we know the maximum timeout to use when
// waiting for receive
ctx->expire = nni_clock() + survey_time;

nni_mtx_unlock(&sock->mtx);
nni_msg_free(msg);
Expand Down

0 comments on commit dc49988

Please sign in to comment.