Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always run channel setup callback on pinned event loop if one exists #618

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion source/channel_bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,14 @@ static struct aws_event_loop *s_get_connection_event_loop(struct client_connecti
return aws_event_loop_group_get_next_loop(args->bootstrap->event_loop_group);
}

static void s_connection_args_setup_callback(
static void s_connect_args_setup_callback_safe(
struct client_connection_args *args,
int error_code,
struct aws_channel *channel) {

AWS_FATAL_ASSERT(
(args->requested_event_loop == NULL) || aws_event_loop_thread_is_callers_thread(args->requested_event_loop));

/* setup_callback is always called exactly once */
AWS_FATAL_ASSERT(!args->setup_called);

Expand All @@ -200,6 +204,75 @@ static void s_connection_args_setup_callback(
s_client_connection_args_release(args);
}

struct aws_connection_args_setup_callback_task {
struct aws_allocator *allocator;
struct aws_task task;
struct client_connection_args *args;
int error_code;
struct aws_channel *channel;
};

static void s_aws_connection_args_setup_callback_task_delete(struct aws_connection_args_setup_callback_task *task) {
if (task == NULL) {
return;
}

s_client_connection_args_release(task->args);
if (task->channel) {
aws_channel_release_hold(task->channel);
}

aws_mem_release(task->allocator, task);
}

void s_aws_connection_args_setup_callback_task_fn(struct aws_task *task, void *arg, enum aws_task_status status) {
(void)task;

struct aws_connection_args_setup_callback_task *callback_task = arg;

if (status == AWS_TASK_STATUS_RUN_READY) {
s_connect_args_setup_callback_safe(callback_task->args, callback_task->error_code, callback_task->channel);
}

s_aws_connection_args_setup_callback_task_delete(callback_task);
}

static struct aws_connection_args_setup_callback_task *s_aws_connection_args_setup_callback_task_new(
struct aws_allocator *allocator,
struct client_connection_args *args,
int error_code,
struct aws_channel *channel) {

struct aws_connection_args_setup_callback_task *task =
aws_mem_calloc(allocator, 1, sizeof(struct aws_connection_args_setup_callback_task));
task->allocator = allocator;
task->args = s_client_connection_args_acquire(args);
task->error_code = error_code;
task->channel = channel;
if (channel != NULL) {
aws_channel_acquire_hold(channel);
}

aws_task_init(
&task->task, s_aws_connection_args_setup_callback_task_fn, task, "safe connection args setup callback");

return task;
}

static void s_connection_args_setup_callback(
struct client_connection_args *args,
int error_code,
struct aws_channel *channel) {

if (args->requested_event_loop == NULL || aws_event_loop_thread_is_callers_thread(args->requested_event_loop)) {
s_connect_args_setup_callback_safe(args, error_code, channel);
} else {
struct aws_connection_args_setup_callback_task *callback_task =
s_aws_connection_args_setup_callback_task_new(args->bootstrap->allocator, args, error_code, channel);
aws_event_loop_schedule_task_now(args->requested_event_loop, &callback_task->task);
}
}

static void s_connection_args_creation_callback(struct client_connection_args *args, struct aws_channel *channel) {

AWS_FATAL_ASSERT(channel != NULL);
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ add_test_case(pem_sanitize_wrong_format_rejected)
add_test_case(socket_handler_echo_and_backpressure)
add_test_case(socket_handler_close)
add_test_case(socket_pinned_event_loop)
add_net_test_case(socket_pinned_event_loop_dns_failure)

if(NOT BYO_CRYPTO)
if(USE_S2N)
Expand Down
105 changes: 105 additions & 0 deletions tests/socket_handler_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ struct socket_common_tester {
struct aws_event_loop_group *el_group;
struct aws_atomic_var current_time_ns;
struct aws_atomic_var stats_handler;

bool setup_called;
struct aws_event_loop *requested_callback_event_loop;
int setup_error_code;
};

static struct socket_common_tester c_tester;
Expand Down Expand Up @@ -399,6 +403,107 @@ static int s_socket_pinned_event_loop_test(struct aws_allocator *allocator, void

AWS_TEST_CASE(socket_pinned_event_loop, s_socket_pinned_event_loop_test)

static void s_dns_failure_test_client_setup_callback(
struct aws_client_bootstrap *bootstrap,
int error_code,
struct aws_channel *channel,
void *user_data) {

(void)bootstrap;
(void)channel;

struct socket_common_tester *socket_tester = (struct socket_common_tester *)user_data;

aws_mutex_lock(&socket_tester->mutex);

socket_tester->setup_error_code = error_code;
socket_tester->setup_called = true;
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(socket_tester->requested_callback_event_loop));
AWS_FATAL_ASSERT(channel == NULL);

aws_mutex_unlock(&socket_tester->mutex);
aws_condition_variable_notify_one(&socket_tester->condition_variable);
}

static void s_dns_failure_handler_test_client_shutdown_callback(
struct aws_client_bootstrap *bootstrap,
int error_code,
struct aws_channel *channel,
void *user_data) {

(void)error_code;
(void)bootstrap;
(void)channel;
(void)user_data;

// Should never be called
AWS_FATAL_ASSERT(false);
}

static bool s_dns_failure_channel_setup_predicate(void *user_data) {
struct socket_common_tester *socket_tester = (struct socket_common_tester *)user_data;
return socket_tester->setup_called;
}

static int s_socket_pinned_event_loop_dns_failure_test(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

s_socket_common_tester_init(allocator, &c_tester);

struct aws_host_resolver_default_options resolver_options = {
.el_group = c_tester.el_group,
.max_entries = 8,
};
struct aws_host_resolver *resolver = aws_host_resolver_new_default(allocator, &resolver_options);

struct aws_client_bootstrap_options client_bootstrap_options = {
.event_loop_group = c_tester.el_group,
.host_resolver = resolver,
};
struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options);
ASSERT_NOT_NULL(client_bootstrap);

struct aws_event_loop *pinned_event_loop = aws_event_loop_group_get_next_loop(c_tester.el_group);
c_tester.requested_callback_event_loop = pinned_event_loop;

struct aws_socket_options socket_options = {
.domain = AWS_SOCKET_IPV4,
.type = AWS_SOCKET_STREAM,
.connect_timeout_ms = 10000,
};

struct aws_socket_channel_bootstrap_options client_channel_options;
AWS_ZERO_STRUCT(client_channel_options);
client_channel_options.bootstrap = client_bootstrap;
client_channel_options.host_name = "notavalid.domain-seriously.uffda";
client_channel_options.port = 443;
client_channel_options.socket_options = &socket_options;
client_channel_options.setup_callback = s_dns_failure_test_client_setup_callback;
client_channel_options.shutdown_callback = s_dns_failure_handler_test_client_shutdown_callback;
client_channel_options.enable_read_back_pressure = false;
client_channel_options.requested_event_loop = pinned_event_loop;
client_channel_options.user_data = &c_tester;

ASSERT_SUCCESS(aws_client_bootstrap_new_socket_channel(&client_channel_options));

ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex));
ASSERT_SUCCESS(aws_condition_variable_wait_pred(
&c_tester.condition_variable, &c_tester.mutex, s_dns_failure_channel_setup_predicate, &c_tester));

/* Verify the setup callback failure was on the requested event loop */
ASSERT_TRUE(c_tester.setup_error_code != 0);

aws_mutex_unlock(&c_tester.mutex);

aws_client_bootstrap_release(client_bootstrap);
aws_host_resolver_release(resolver);
ASSERT_SUCCESS(s_socket_common_tester_clean_up(&c_tester));

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(socket_pinned_event_loop_dns_failure, s_socket_pinned_event_loop_dns_failure_test)

static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

Expand Down