Skip to content

Commit

Permalink
continued wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK committed Jan 29, 2025
1 parent eeaa8e6 commit a4998b2
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 47 deletions.
87 changes: 53 additions & 34 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ struct dispatch_scheduling_state {
* The lock is used to protect the scheduled_services list cross threads. It must be held when we add or remove
* scheduled_service_entry entries from the scheduled_services list.
*/
struct aws_mutex services_lock;
struct aws_mutex schedule_services_lock;
/**
* priority queue of <scheduled_service_entry> in sorted order by timestamp. Each scheduled_service_entry represents
* a block ALREADY SCHEDULED on apple dispatch queue.
Expand Down Expand Up @@ -149,20 +149,20 @@ static int s_wunlock_dispatch_loop_context(struct dispatch_loop_context *context
return aws_rw_lock_wunlock(&context->dispatch_loop_context_lock);
}

static int s_lock_cross_thread_data(struct dispatch_loop *loop) {
static int s_lock_synced_data(struct dispatch_loop *loop) {
return aws_mutex_lock(&loop->synced_data.synced_data_lock);
}

static int s_unlock_cross_thread_data(struct dispatch_loop *loop) {
static int s_unlock_synced_data(struct dispatch_loop *loop) {
return aws_mutex_unlock(&loop->synced_data.synced_data_lock);
}

static int s_lock_service_entries(struct dispatch_loop_context *context) {
return aws_mutex_lock(&context->scheduling_state.services_lock);
return aws_mutex_lock(&context->scheduling_state.schedule_services_lock);
}

static int s_unlock_service_entries(struct dispatch_loop_context *context) {
return aws_mutex_unlock(&context->scheduling_state.services_lock);
return aws_mutex_unlock(&context->scheduling_state.schedule_services_lock);
}

// Not sure why use 7 as the default queue size. Just follow what we used in task_scheduler.c
Expand Down Expand Up @@ -211,7 +211,7 @@ static bool s_should_schedule_iteration(
static void s_dispatch_loop_context_destroy(void *context) {
struct dispatch_loop_context *dispatch_loop_context = context;
aws_priority_queue_clean_up(&dispatch_loop_context->scheduling_state.scheduled_services);
aws_mutex_clean_up(&dispatch_loop_context->scheduling_state.services_lock);
aws_mutex_clean_up(&dispatch_loop_context->scheduling_state.schedule_services_lock);
aws_rw_lock_clean_up(&dispatch_loop_context->dispatch_loop_context_lock);
aws_mem_release(dispatch_loop_context->allocator, dispatch_loop_context);
}
Expand Down Expand Up @@ -295,12 +295,23 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
*/
dispatch_loop->dispatch_queue = dispatch_queue_create(dispatch_queue_id, DISPATCH_QUEUE_SERIAL);

/*
* Suspend will increase the dispatch reference count.
* A suspended dispatch queue must have dispatch_release() called on it for Apple to release the dispatch queue.
* We suspend the newly created Apple dispatch queue here to conform with other event loop types. A new event loop
* should start in a non-running state until run() is called.
*/
dispatch_suspend(dispatch_loop->dispatch_queue);

AWS_LOGF_INFO(
AWS_LS_IO_EVENT_LOOP, "id=%p: Apple dispatch queue created with id: %s", (void *)loop, dispatch_queue_id);

aws_mutex_init(&dispatch_loop->synced_data.synced_data_lock);

/* The dispatch queue is both suspended and has no active blocks on it at this point*/
dispatch_loop->synced_data.stopped = true;
dispatch_loop->synced_data.suspended = true;
dispatch_loop->synced_data.is_executing = false;
dispatch_loop->synced_data.stopped = false;

if (aws_task_scheduler_init(&dispatch_loop->scheduler, alloc)) {
AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: Initialization of task scheduler failed", (void *)loop);
Expand All @@ -318,7 +329,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(

aws_rw_lock_init(&dispatch_loop_context->dispatch_loop_context_lock);

aws_mutex_init(&dispatch_loop_context->scheduling_state.services_lock);
aws_mutex_init(&dispatch_loop_context->scheduling_state.schedule_services_lock);
if (aws_priority_queue_init_dynamic(
&dispatch_loop_context->scheduling_state.scheduled_services,
alloc,
Expand Down Expand Up @@ -353,16 +364,16 @@ static void s_dispatch_queue_destroy_task(void *context) {
struct dispatch_loop *dispatch_loop = context;
s_rlock_dispatch_loop_context(dispatch_loop->context);

s_lock_cross_thread_data(dispatch_loop);
dispatch_loop->synced_data.suspended = true;
s_lock_synced_data(dispatch_loop);
dispatch_loop->synced_data.is_destroying = true;
dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id();
dispatch_loop->synced_data.is_executing = true;

// swap the cross-thread tasks into task-local data
struct aws_linked_list local_cross_thread_tasks;
aws_linked_list_init(&local_cross_thread_tasks);
aws_linked_list_swap_contents(&dispatch_loop->synced_data.cross_thread_tasks, &local_cross_thread_tasks);
s_unlock_cross_thread_data(dispatch_loop);
s_unlock_synced_data(dispatch_loop);

aws_task_scheduler_clean_up(&dispatch_loop->scheduler); /* Tasks in scheduler get cancelled*/
while (!aws_linked_list_empty(&local_cross_thread_tasks)) {
Expand All @@ -371,9 +382,9 @@ static void s_dispatch_queue_destroy_task(void *context) {
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}

s_lock_cross_thread_data(dispatch_loop);
s_lock_synced_data(dispatch_loop);
dispatch_loop->synced_data.is_executing = false;
s_unlock_cross_thread_data(dispatch_loop);
s_unlock_synced_data(dispatch_loop);

s_runlock_dispatch_loop_context(dispatch_loop->context);
s_dispatch_event_loop_destroy(dispatch_loop->base_loop);
Expand All @@ -386,7 +397,14 @@ static void s_destroy(struct aws_event_loop *event_loop) {
/* make sure the loop is running so we can schedule a last task. */
s_run(event_loop);

/* cancel outstanding tasks */
/*
* Schedules `s_dispatch_queue_destroy_task()` to run on the Apple dispatch queue of the event loop.
*
* Any block that is currently running or already scheduled on the dispatch queue will be completed before
* `s_dispatch_queue_destroy_task()`.
*
* `s_dispatch_queue_destroy_task()` will cancel outstanding tasks and then run `s_dispatch_event_loop_destroy()`
*/
dispatch_async_and_wait_f(dispatch_loop->dispatch_queue, dispatch_loop, s_dispatch_queue_destroy_task);

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop);
Expand All @@ -403,35 +421,34 @@ static void s_try_schedule_new_iteration(struct dispatch_loop_context *loop, uin
static int s_run(struct aws_event_loop *event_loop) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

s_lock_cross_thread_data(dispatch_loop);
if (dispatch_loop->synced_data.suspended || dispatch_loop->synced_data.stopped) {
s_lock_synced_data(dispatch_loop);
if (dispatch_loop->synced_data.suspended) {
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);
dispatch_resume(dispatch_loop->dispatch_queue);
dispatch_loop->synced_data.suspended = false;
dispatch_loop->synced_data.stopped = false;
s_rlock_dispatch_loop_context(dispatch_loop->context);
s_lock_service_entries(dispatch_loop->context);
s_try_schedule_new_iteration(dispatch_loop->context, 0);
s_unlock_service_entries(dispatch_loop->context);
s_runlock_dispatch_loop_context(dispatch_loop->context);
}
s_unlock_cross_thread_data(dispatch_loop);
s_unlock_synced_data(dispatch_loop);

return AWS_OP_SUCCESS;
}

static int s_stop(struct aws_event_loop *event_loop) {
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

s_lock_cross_thread_data(dispatch_loop);
s_lock_synced_data(dispatch_loop);
if (!dispatch_loop->synced_data.suspended) {
dispatch_loop->synced_data.suspended = true;
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread.", (void *)event_loop);
/* Suspend will increase the dispatch reference count. It is required to call resume before
* releasing the dispatch queue. */
dispatch_suspend(dispatch_loop->dispatch_queue);
}
s_unlock_cross_thread_data(dispatch_loop);
s_unlock_synced_data(dispatch_loop);

return AWS_OP_SUCCESS;
}
Expand All @@ -445,7 +462,7 @@ static void s_end_iteration(struct scheduled_service_entry *entry) {
struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context;
struct dispatch_loop *dispatch_loop = dispatch_queue_context->io_dispatch_loop;

s_lock_cross_thread_data(dispatch_loop);
s_lock_synced_data(dispatch_loop);
dispatch_loop->synced_data.is_executing = false;

// Remove the node before do scheduling so we didnt consider the entry itself
Expand All @@ -470,7 +487,7 @@ static void s_end_iteration(struct scheduled_service_entry *entry) {
s_unlock_service_entries(dispatch_queue_context);
}

s_unlock_cross_thread_data(dispatch_loop);
s_unlock_synced_data(dispatch_loop);
}

// Iteration function that scheduled and executed by the Dispatch Queue API
Expand All @@ -489,11 +506,12 @@ static void s_run_iteration(void *service_entry) {
// swap the cross-thread tasks into task-local data
struct aws_linked_list local_cross_thread_tasks;
aws_linked_list_init(&local_cross_thread_tasks);
s_lock_cross_thread_data(dispatch_loop);
s_lock_synced_data(dispatch_loop);
dispatch_loop->synced_data.stopped = false;
dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id();
dispatch_loop->synced_data.is_executing = true;
aws_linked_list_swap_contents(&dispatch_loop->synced_data.cross_thread_tasks, &local_cross_thread_tasks);
s_unlock_cross_thread_data(dispatch_loop);
s_unlock_synced_data(dispatch_loop);

aws_event_loop_register_tick_start(dispatch_loop->base_loop);

Expand Down Expand Up @@ -540,13 +558,13 @@ static void s_run_iteration(void *service_entry) {
* If timestamp==0, the function will always schedule a new iteration as long as the event loop is not suspended.
*
* The function should be wrapped with the following locks:
* dispatch_loop->context->lock: To retain the dispatch loop
* dispatch_loop->context->dispatch_loop_context_lock: To retain the dispatch loop
* dispatch_loop->synced_data.synced_data_lock : To verify if the dispatch loop is suspended
* dispatch_loop_context->scheduling_state->services_lock: To modify the scheduled_services list
* dispatch_loop_context->scheduling_state->schedule_services_lock: To modify the scheduled_services list
*/
static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_loop_context, uint64_t timestamp) {
struct dispatch_loop *dispatch_loop = dispatch_loop_context->io_dispatch_loop;
if (!dispatch_loop || dispatch_loop->synced_data.suspended) {
if (!dispatch_loop || dispatch_loop->synced_data.suspended || dispatch_loop->synced_data.is_destroying) {
return;
}
if (!s_should_schedule_iteration(&dispatch_loop_context->scheduling_state.scheduled_services, timestamp)) {
Expand Down Expand Up @@ -584,7 +602,7 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
if (dispatch_loop->context->io_dispatch_loop == NULL) {
goto schedule_task_common_cleanup;
}
s_lock_cross_thread_data(dispatch_loop);
s_lock_synced_data(dispatch_loop);
task->timestamp = run_at_nanos;

bool was_empty = aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks);
Expand Down Expand Up @@ -618,7 +636,7 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
s_unlock_service_entries(dispatch_loop->context);
}

s_unlock_cross_thread_data(dispatch_loop);
s_unlock_synced_data(dispatch_loop);
schedule_task_common_cleanup:
s_runlock_dispatch_loop_context(dispatch_loop->context);
}
Expand Down Expand Up @@ -662,15 +680,16 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
return AWS_OP_SUCCESS;
}

// The dispatch queue will assign the task block to threads, we will threat all
// tasks as cross thread tasks. Ignore the caller thread verification for apple
// dispatch queue.
/*
* We use aws_thread_id_equal with syched_data.current_thread_id and synced_data.is_executing to determine
* if operation is being executed on the same dispatch queue thread.
*/
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
struct dispatch_loop *dispatch_queue = event_loop->impl_data;
s_lock_cross_thread_data(dispatch_queue);
s_lock_synced_data(dispatch_queue);
bool result =
dispatch_queue->synced_data.is_executing &&
aws_thread_thread_id_equal(dispatch_queue->synced_data.current_thread_id, aws_thread_current_thread_id());
s_unlock_cross_thread_data(dispatch_queue);
s_unlock_synced_data(dispatch_queue);
return result;
}
29 changes: 17 additions & 12 deletions source/darwin/dispatch_queue_event_loop_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,38 @@ struct dispatch_loop {
* synced_data struct is accessed or modified.
*/
struct aws_mutex synced_data_lock;

/*
* `is_executing` flag and `current_thread_id` together are used
* to identify the executing thread id for dispatch queue. See `static bool s_is_on_callers_thread(struct
* aws_event_loop *event_loop)` for details.
* `is_executing` flag and `current_thread_id` are used together to identify the thread id of the dispatch queue
* running the current block. See dispatch queue's `s_is_on_callers_thread()` implementation for details.
*/
bool is_executing;
aws_thread_id_t current_thread_id;

/*
* Set to true when `stop()` is called on event loop. Once suspended is set to true, underlying dispatch queue
* is set to suspend and event loop will no longer schedule any future service entries. If an iteration block is
* running it will continue till it finishes. `run()` must be called on a suspended dispatch queue event loop to
* schedule an iteration block.
* Will be true if dispatch queue is in a suspended state. A dispatch queue in a suspended state will not start
* any blocks that are already enqueued but will not prevent additional blocks from being queued.
*
* Set to true when `stop()` is called on event loop.
* `run()` must be called on owning event_loop to resume processing of blocks on a suspended dispatch queue.
*
* Calling dispatch_sync() on a suspended dispatch queue will deadlock.
*/
bool suspended;

/*
* Will be true when the underlying dispatch_queue is both suspended and has completed running any in progress
* iteration block. `run()` must be called to resume the event loop and its underlying dispatch queue to
* schedule an iteration block.
* Will be true when the underlying dispatch_queue has completed running all enqueued blocks and no futher work
* is enqueued.
*/
bool stopped;

/*
* Will be true when dispatch loop has entered state where it is being destroyed.
*/
bool is_destroying;

struct aws_linked_list cross_thread_tasks;
} synced_data;

bool is_destroying;
};

#endif /* #ifndef AWS_IO_DARWIN_DISPATCH_QUEUE_H */
3 changes: 2 additions & 1 deletion source/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#ifdef AWS_USE_APPLE_NETWORK_FRAMEWORK
static enum aws_event_loop_type s_default_event_loop_type_override = AWS_EVENT_LOOP_DISPATCH_QUEUE;
#else
static enum aws_event_loop_type s_default_event_loop_type_override = AWS_EVENT_LOOP_PLATFORM_DEFAULT;
// DEBUG WIP CHANGE THIS BACK TO AWS_EVENT_LOOP_PLATFORM_DEFAULT
static enum aws_event_loop_type s_default_event_loop_type_override = AWS_EVENT_LOOP_DISPATCH_QUEUE;
#endif

struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, aws_io_clock_fn *clock) {
Expand Down

0 comments on commit a4998b2

Please sign in to comment.