Skip to content

Commit

Permalink
fix nw_socket state update workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Feb 3, 2025
1 parent 4becc61 commit b79b9a3
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 78 deletions.
131 changes: 55 additions & 76 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,18 @@ static int s_unlock_socket_state(struct nw_socket *nw_socket) {
return aws_mutex_unlock(&nw_socket->synced_state.lock);
}

static inline void s_set_socket_state(struct nw_socket *nw_socket, struct aws_socket *socket, int state) {
s_lock_socket_state(nw_socket);
// The state can only go increasing, except for LISTENING and STOPPED. They can switch between each other.
if (nw_socket->synced_state.state < state || (state == LISTENING && nw_socket->synced_state.state == STOPPED)) {
nw_socket->synced_state.state = state;
if (socket) {
socket->state = state;
}
}
s_unlock_socket_state(nw_socket);
}

static int s_setup_socket_params(struct nw_socket *nw_socket, const struct aws_socket_options *options) {
if (options->type == AWS_SOCKET_STREAM) {
/* if TCP, setup all the tcp options */
Expand Down Expand Up @@ -462,10 +474,7 @@ int aws_socket_init_apple_nw_socket(
nw_socket->synced_data.base_socket = socket;
aws_mutex_unlock(&nw_socket->synced_data.lock);

s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = INIT;
socket->state = INIT;
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, INIT);

if (s_setup_socket_params(nw_socket, options)) {
aws_mem_release(alloc, nw_socket);
Expand Down Expand Up @@ -547,40 +556,40 @@ static void s_process_readable_task(struct aws_task *task, void *arg, enum aws_t
bool data_queued = false;

if (status != AWS_TASK_STATUS_CANCELED) {
aws_mutex_lock(&nw_socket->synced_data.lock);
struct aws_socket *socket = nw_socket->synced_data.base_socket;

if (nw_socket->on_readable) {
// If data is valid, push it in read_queue. The read_queue should be only accessed in event loop, as the
// task is scheduled in event loop, it is fine to directly access it.
if (readable_args->data) {
struct read_queue_node *node = aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct read_queue_node));
node->allocator = nw_socket->allocator;
node->received_data = readable_args->data;
aws_linked_list_push_back(&nw_socket->read_queue, &node->node);
data_queued = true;
}

// If data is valid, push it in read_queue. The read_queue should be only accessed in event loop, as the
// task is scheduled in event loop, it is fine to directly access it.
if (readable_args->data) {
struct read_queue_node *node = aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct read_queue_node));
node->allocator = nw_socket->allocator;
node->received_data = readable_args->data;
aws_linked_list_push_back(&nw_socket->read_queue, &node->node);
data_queued = true;
}
aws_mutex_lock(&nw_socket->synced_data.lock);
struct aws_socket *socket = nw_socket->synced_data.base_socket;

if (socket) {
if (readable_args->error_code == AWS_IO_SOCKET_CLOSED) {
aws_socket_close(socket);
}
nw_socket->on_readable(socket, readable_args->error_code, nw_socket->on_readable_user_data);
if (socket && nw_socket->on_readable) {
if (readable_args->error_code == AWS_IO_SOCKET_CLOSED) {
aws_socket_close(socket);
}
nw_socket->on_readable(socket, readable_args->error_code, nw_socket->on_readable_user_data);
}
aws_mutex_unlock(&nw_socket->synced_data.lock);
}

if (!data_queued && readable_args->data) {
dispatch_release(readable_args->data);
}
nw_socket_release_internal_ref(nw_socket);

AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p: nw_socket_release_internal_ref: s_process_readable_task %lu",
(void *)nw_socket,
aws_atomic_load_int(&nw_socket->internal_ref_count.ref_count));
nw_socket_release_internal_ref(nw_socket);

aws_mem_release(readable_args->allocator, task);
aws_mem_release(readable_args->allocator, readable_args);
}
Expand Down Expand Up @@ -628,12 +637,13 @@ static void s_process_connection_result_task(struct aws_task *task, void *arg, e
aws_mutex_unlock(&nw_socket->synced_data.lock);
}

nw_socket_release_internal_ref(nw_socket);
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p: nw_socket_release_internal_ref: s_process_connection_result_task %lu",
(void *)nw_socket,
aws_atomic_load_int(&nw_socket->internal_ref_count.ref_count));
nw_socket_release_internal_ref(nw_socket);

aws_mem_release(task_args->allocator, task);
aws_mem_release(task_args->allocator, task_args);
}
Expand Down Expand Up @@ -677,12 +687,13 @@ static void s_process_cancel_task(struct aws_task *task, void *arg, enum aws_tas
aws_event_loop_cancel_task(nw_socket->synced_data.event_loop, cancel_args->task_to_cancel);
}

nw_socket_release_internal_ref(nw_socket);
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p: nw_socket_release_internal_ref: s_process_cancel_task %lu",
(void *)nw_socket,
aws_atomic_load_int(&nw_socket->internal_ref_count.ref_count));
nw_socket_release_internal_ref(nw_socket);

aws_mem_release(cancel_args->allocator, task);
aws_mem_release(cancel_args->allocator, cancel_args);
}
Expand Down Expand Up @@ -744,10 +755,7 @@ static void s_process_connection_state_changed_task(struct aws_task *task, void
nw_socket->synced_data.event_loop = NULL;
aws_mutex_unlock(&nw_socket->synced_data.lock);

s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = CLOSED;
socket->state = CLOSED;
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, CLOSED);
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p: nw_socket_release_internal_ref: connection canceled %lu",
Expand Down Expand Up @@ -790,10 +798,7 @@ static void s_process_connection_state_changed_task(struct aws_task *task, void
s_schedule_cancel_task(nw_socket, &nw_socket->timeout_args->task);
}

s_lock_socket_state(nw_socket);
socket->state = CONNECTED_WRITE | CONNECTED_READ;
nw_socket->synced_state.state = CONNECTED_WRITE | CONNECTED_READ;
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, CONNECTED_WRITE | CONNECTED_READ);

nw_socket->setup_run = true;
aws_ref_count_acquire(&nw_socket->ref_count);
Expand All @@ -817,12 +822,8 @@ static void s_process_connection_state_changed_task(struct aws_task *task, void
int error_code = s_determine_socket_error(connection_args->error);
nw_socket->last_error = error_code;
aws_raise_error(error_code);
s_lock_socket_state(nw_socket);
if (nw_socket->synced_state.state < CLOSING) {
nw_socket->synced_state.state = ERROR;
socket->state = ERROR;
}
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, ERROR);

if (!nw_socket->setup_run) {
s_schedule_on_connection_result(nw_socket, error_code);
nw_socket->setup_run = true;
Expand Down Expand Up @@ -857,12 +858,8 @@ static void s_process_connection_state_changed_task(struct aws_task *task, void
int error_code = s_determine_socket_error(connection_args->error);
nw_socket->last_error = error_code;
aws_raise_error(error_code);
s_lock_socket_state(nw_socket);
if (nw_socket->synced_state.state < CLOSING) {
nw_socket->synced_state.state = ERROR;
socket->state = ERROR;
}
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, ERROR);

if (!nw_socket->setup_run) {
s_schedule_on_connection_result(nw_socket, error_code);
nw_socket->setup_run = true;
Expand Down Expand Up @@ -994,10 +991,8 @@ static void s_process_listener_success_task(struct aws_task *task, void *args, e
new_nw_socket->setup_run = true;
new_nw_socket->currently_connected = true;
// Setup socket state to start read/write operations.
aws_mutex_lock(&new_nw_socket->synced_state.lock);
new_nw_socket->synced_state.state = CONNECTED_READ | CONNECTED_WRITE;
new_socket->state = CONNECTED_READ | CONNECTED_WRITE;
aws_mutex_unlock(&new_nw_socket->synced_state.lock);

s_set_socket_state(new_nw_socket, new_socket, CONNECTED_READ | CONNECTED_WRITE);

nw_connection_set_state_changed_handler(
new_socket->io_handle.data.handle, ^(nw_connection_state_t state, nw_error_t error) {
Expand Down Expand Up @@ -1253,10 +1248,7 @@ static int s_socket_connect_fn(
remote_endpoint->address,
(int)remote_endpoint->port);

s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = CONNECTING;
socket->state = CONNECTING;
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, CONNECTING);

socket->remote_endpoint = *remote_endpoint;
socket->connect_accept_user_data = user_data;
Expand Down Expand Up @@ -1417,10 +1409,7 @@ static int s_socket_bind_fn(struct aws_socket *socket, const struct aws_socket_e
nw_release(endpoint);

// Apple network framework requires connection besides bind.
s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = BOUND;
socket->state = BOUND;
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, BOUND);

AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p: successfully bound", (void *)socket);

Expand Down Expand Up @@ -1458,10 +1447,9 @@ static int s_socket_listen_fn(struct aws_socket *socket, int backlog_size) {

if (!socket->io_handle.data.handle) {
AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p: listen failed with error code %d", (void *)socket, aws_last_error());
s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = ERROR;
socket->state = ERROR;
s_unlock_socket_state(nw_socket);

s_set_socket_state(nw_socket, socket, ERROR);

return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}

Expand All @@ -1471,10 +1459,7 @@ static int s_socket_listen_fn(struct aws_socket *socket, int backlog_size) {
AWS_LOGF_INFO(
AWS_LS_IO_SOCKET, "id=%p handle=%p: successfully listening", (void *)socket, socket->io_handle.data.handle);

s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = LISTENING;
socket->state = LISTENING;
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, LISTENING);

return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -1607,10 +1592,8 @@ static int s_socket_start_accept_fn(
aws_mutex_lock(&nw_socket->synced_data.lock);
nw_socket->synced_data.event_loop = NULL;
aws_mutex_unlock(&nw_socket->synced_data.lock);
s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = CLOSED;
socket->state = CLOSED;
s_unlock_socket_state(nw_socket);

s_set_socket_state(nw_socket, socket, CLOSED);

AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
Expand Down Expand Up @@ -1663,10 +1646,8 @@ static int s_socket_stop_accept_fn(struct aws_socket *socket) {

nw_listener_cancel(socket->io_handle.data.handle);

s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = STOPPED;
socket->state = STOPPED;
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, STOPPED);

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -1709,10 +1690,8 @@ static int s_socket_close_fn(struct aws_socket *socket) {
}
}

s_lock_socket_state(nw_socket);
nw_socket->synced_state.state = CLOSING;
socket->state = CLOSING;
s_unlock_socket_state(nw_socket);
s_set_socket_state(nw_socket, socket, CLOSING);

s_socket_close_and_release(socket);

return AWS_OP_SUCCESS;
Expand Down
1 change: 1 addition & 0 deletions source/posix/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,7 @@ static int s_socket_close(struct aws_socket *socket) {
aws_mem_release(socket->allocator, write_request);
}
}

if (socket_impl->on_shutdown_complete) {
socket_impl->on_shutdown_complete(socket_impl->shutdown_user_data);
}
Expand Down
4 changes: 2 additions & 2 deletions tests/socket_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1048,9 +1048,9 @@ static int s_test_connect_timeout(struct aws_allocator *allocator, void *ctx) {

struct aws_socket outgoing;
ASSERT_SUCCESS(aws_socket_init(&outgoing, allocator, &options));
aws_socket_set_shutdown_callback(&outgoing, s_local_outgoing_connection_shutdown_complete, &outgoing_args);
ASSERT_SUCCESS(aws_socket_connect(&outgoing, &endpoint, event_loop, s_local_outgoing_connection, &outgoing_args));
aws_mutex_lock(&mutex);
aws_socket_set_shutdown_callback(&outgoing, s_local_outgoing_connection_shutdown_complete, &outgoing_args);
ASSERT_SUCCESS(aws_condition_variable_wait_pred(
&condition_variable, &mutex, s_connection_completed_predicate, &outgoing_args));
aws_mutex_unlock(&mutex);
Expand Down Expand Up @@ -1153,7 +1153,7 @@ static int s_test_connect_timeout_cancellation(struct aws_allocator *allocator,
ASSERT_SUCCESS(aws_socket_init(&outgoing, allocator, &options));
ASSERT_SUCCESS(aws_socket_connect(&outgoing, &endpoint, event_loop, s_local_outgoing_connection, &outgoing_args));

aws_socket_set_shutdown_callback(&outgoing, s_local_outgoing_connection_shutdown_complete, &outgoing_args);
aws_socket_set_cleanup_callback(&outgoing, s_local_outgoing_connection_shutdown_complete, &outgoing_args);
aws_event_loop_group_release(el_group);

aws_thread_join_all_managed();
Expand Down

0 comments on commit b79b9a3

Please sign in to comment.