diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 9b7a31f38..f3423fba9 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -59,39 +59,39 @@ static struct aws_event_loop_vtable s_vtable = { * 2. Process cross-thread tasks. * 3. Execute all runnable tasks. * - * Apple Dispatch queues are FIFO queues to which the application can submit tasks in the form of block objects, and the - * block objects will be executed on a system defined thread pool. Instead of executing the loop on a single thread, we - * tried to recurrently run a single iteration of the execution loop as a dispatch queue block object. - * aws-c-io library use a sequential dispatch queue to make sure the tasks scheduled on the same dispatch queue are - * executed in a strict execution order, though the tasks might be distributed on different threads in the thread pool. + * Apple Dispatch queues can be given a concurrent or serial attribute on creation. We use Serial Dispatch Queues that + * are FIFO queues to which the application can submit tasks in the form of block objects. The block objects will be + * executed on a system defined thread pool. Instead of executing the loop on a single thread, we recurrently run a + * single iteration of the execution loop as a dispatch queue block object. aws-c-io library uses a serial dispatch + * queue to make sure the tasks scheduled on the event loop task scheduler are executed in the correct order. * * Data Structures ****** * `dispatch_loop_context`: Context for each execution iteration - * `scheduled_service_entry`: Each entry maps to each iteration we scheduled on system dispatch queue. As we lost - * control of the submitted block on the system dispatch queue, the entry is what we used to track the context and user - * data. + * `scheduled_service_entry`: Each entry maps to an iteration we scheduled on Apple's dispatch queue. As we lose + * control of the submitted block once transferred to Apple's dispatch queue, this entry acquires a refcount on the + * dispatch_loop_context which checks whether it has a non-NULL dispatch_loop to run on. * `dispatch_loop`: Implementation of the event loop for dispatch queue. * * Functions ************ - * `s_run_iteration`: The function execute on each single iteration - * `begin_iteration`: Decide if we should run the iteration - * `end_iteration`: Clean up the related resource and determine if we should schedule next iteration + * `s_run_iteration`: This function executes scheduled tasks + * `s_end_iteration`: Clean up the related resource and determine if we should schedule next iteration * */ -/* The dispatch_scheduling_state holds required information to schedule a "block" on the dispatch_queue. */ +/* Holds required information to schedule a "block" on the dispatch_queue. */ struct dispatch_scheduling_state { /** - * The lock is used to protect the scheduled_services list cross threads. It should be hold while we add/remove - * entries from the scheduled_services list. + * The lock is used to protect the scheduled_services list cross threads. It must be held when we add or remove + * scheduled_service_entry entries from the scheduled_services list. */ struct aws_mutex services_lock; /** * priority queue of in sorted order by timestamp. Each scheduled_service_entry represents * a block ALREADY SCHEDULED on apple dispatch queue. * - * When we go to schedule a new iteration, we check here first to see if our scheduling attempt is redundant. + * When we schedule a new run iteration, scheduled_services is checked to see if the scheduling attempt is + * redundant. */ struct aws_priority_queue scheduled_services; }; @@ -99,21 +99,21 @@ struct dispatch_scheduling_state { /* Internal ref-counted dispatch loop context to processing Apple Dispatch Queue Resources */ struct dispatch_loop_context { /** - * The conetxt lock is a read-write lock used to protect dispatch_loop. - * The write lock will be acquired when we make changes to dispatch_loop. And the read lock will be acquired - * when we need verify if the dispatch_loop is alive. This makes sure that the dispatch_loop will not be destroyed - * from other thread while we are using it. + * This is a read-write lock used to protect dispatch_loop. + * The write lock needs to be acquired when we make changes to dispatch_loop. T + * The read lock needs to be acquired when we verify whether dispatch_loop is alive. + * This insures dispatch_loop will not be destroyed from other threads while we are using it. */ + struct aws_allocator *allocator; struct aws_rw_lock lock; struct dispatch_loop *io_dispatch_loop; struct dispatch_scheduling_state scheduling_state; - struct aws_allocator *allocator; struct aws_ref_count ref_count; }; /** - * The data structure used to track the dispatch queue execution iteration (block). Each entry associated to an - * iteration scheduled on Apple Dispatch Queue. + * The data structure used to track the dispatch queue execution iteration (block). Each entry is associated with + * an run iteration scheduled on Apple Dispatch Queue. */ struct scheduled_service_entry { struct aws_allocator *allocator; @@ -187,21 +187,6 @@ static struct scheduled_service_entry *s_scheduled_service_entry_new( return entry; } -/** - * The function should be wrapped around scheduling_status->lock - */ -static void s_scheduled_service_entry_destroy( - struct dispatch_scheduling_state scheduling_status, - struct scheduled_service_entry *entry) { - if (aws_priority_queue_node_is_in_queue(&entry->priority_queue_node)) { - aws_priority_queue_remove(&scheduling_status.scheduled_services, entry, &entry->priority_queue_node); - } - struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context; - s_release_dispatch_loop_context(dispatch_queue_context); - - aws_mem_release(entry->allocator, entry); -} - /** * Helper function to check if another scheduled iteration already exists that will handle our needs * @@ -222,7 +207,7 @@ static bool s_should_schedule_iteration( return (*entry)->timestamp > proposed_iteration_time; } -/* On dispatch event loop context ref-count reaches 0 */ +/* Called when dispatch_loop_context ref-count reaches 0 */ static void s_dispatch_loop_context_destroy(void *context) { struct dispatch_loop_context *dispatch_loop_context = context; aws_priority_queue_clean_up(&dispatch_loop_context->scheduling_state.scheduled_services); @@ -231,10 +216,9 @@ static void s_dispatch_loop_context_destroy(void *context) { aws_mem_release(dispatch_loop_context->allocator, dispatch_loop_context); } -/* On dispatch event loop ref-count reaches 0 */ -static void s_dispatch_event_loop_destroy(void *context) { +// Manually called to destroy an aws_event_loop +static void s_dispatch_event_loop_destroy(aws_event_loop *event_loop) { // release dispatch loop - struct aws_event_loop *event_loop = context; struct dispatch_loop *dispatch_loop = event_loop->impl_data; if (dispatch_loop->context) { @@ -245,8 +229,8 @@ static void s_dispatch_event_loop_destroy(void *context) { s_release_dispatch_loop_context(dispatch_loop->context); } - // The scheduler should be cleaned up and zero out in event loop destroy task. Double check here in case the destroy - // function is not called or initialize was failed. + // The scheduler should be cleaned up and zeroed out in s_dispatch_queue_destroy_task. + // Double-check here in case the destroy function is not called or event loop initialization failed. if (aws_task_scheduler_is_valid(&dispatch_loop->scheduler)) { aws_task_scheduler_clean_up(&dispatch_loop->scheduler); } @@ -288,14 +272,15 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( AWS_PRECONDITION(options->clock); struct aws_event_loop *loop = aws_mem_calloc(alloc, 1, sizeof(struct aws_event_loop)); - struct dispatch_loop *dispatch_loop = NULL; AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing dispatch_queue event-loop", (void *)loop); if (aws_event_loop_init_base(loop, alloc, options->clock)) { goto clean_up; } - dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop)); + loop->vtable = &s_vtable; + + struct dispatch_loop *dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop)); dispatch_loop->allocator = alloc; loop->impl_data = dispatch_loop; dispatch_loop->base_loop = loop; @@ -303,12 +288,11 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( char dispatch_queue_id[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH] = {0}; s_get_unique_dispatch_queue_id(dispatch_queue_id); + /* + * Apple API dispatch_queue_create returns a dispatch_queue_t. This cannot fail and will crash if it does. + * A reference to the dispatch queue is retained and must be released explicitly with dispatch_release(). + */ dispatch_loop->dispatch_queue = dispatch_queue_create(dispatch_queue_id, DISPATCH_QUEUE_SERIAL); - if (!dispatch_loop->dispatch_queue) { - AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to create dispatch queue.", (void *)loop); - aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); - goto clean_up; - } AWS_LOGF_INFO( AWS_LS_IO_EVENT_LOOP, "id=%p: Apple dispatch queue created with id: %s", (void *)loop, dispatch_queue_id); @@ -316,47 +300,43 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( aws_mutex_init(&dispatch_loop->synced_data.lock); dispatch_loop->synced_data.is_executing = false; - int err = aws_task_scheduler_init(&dispatch_loop->scheduler, alloc); - if (err) { - AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing task scheduler failed", (void *)loop); + if (aws_task_scheduler_init(&dispatch_loop->scheduler, alloc)) { + AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: Initialization of task scheduler failed", (void *)loop); goto clean_up; } aws_linked_list_init(&dispatch_loop->synced_data.cross_thread_tasks); - struct dispatch_loop_context *context = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context)); + struct dispatch_loop_context *dispatch_loop_context = + aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context)); + aws_ref_count_init(&context->ref_count, dispatch_loop_context, s_dispatch_loop_context_destroy); + dispatch_loop_context->allocator = alloc; + dispatch_loop->context = dispatch_loop_context; + aws_rw_lock_init(&dispatch_loop_context->lock); + dispatch_loop_context->io_dispatch_loop = dispatch_loop; + aws_mutex_init(&dispatch_loop_context->scheduling_state.services_lock); if (aws_priority_queue_init_dynamic( - &context->scheduling_state.scheduled_services, + &dispatch_loop_context->scheduling_state.scheduled_services, alloc, DEFAULT_QUEUE_SIZE, sizeof(struct scheduled_service_entry *), &s_compare_timestamps)) { - AWS_LOGF_INFO( + AWS_LOGF_ERROR( AWS_LS_IO_EVENT_LOOP, - "id=%p: priority queue creation failed, clean up the context: %s", + "id=%p: Priority queue creation failed, cleaning up the dispatch queue: %s", (void *)loop, dispatch_queue_id); - aws_mem_release(alloc, context); + aws_mem_release(alloc, dispatch_loop_context); goto clean_up; }; - aws_ref_count_init(&context->ref_count, context, s_dispatch_loop_context_destroy); - context->allocator = alloc; - - aws_mutex_init(&context->scheduling_state.services_lock); - - aws_rw_lock_init(&context->lock); - context->io_dispatch_loop = dispatch_loop; - dispatch_loop->context = context; - - loop->vtable = &s_vtable; - return loop; clean_up: if (dispatch_loop) { if (dispatch_loop->dispatch_queue) { + /* Apple API for releasing reference count on a dispatch object. */ dispatch_release(dispatch_loop->dispatch_queue); } s_dispatch_event_loop_destroy(loop); @@ -452,37 +432,23 @@ static int s_stop(struct aws_event_loop *event_loop) { return AWS_OP_SUCCESS; } -/** - * The function decides if we should run this iteration. - * Returns true if we should execute an iteration, false otherwise - * - * The function should be wrapped with dispatch_loop->context.lock to retain the dispatch loop while running. - */ -static bool begin_iteration(struct scheduled_service_entry *entry) { - struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; - - if (!dispatch_loop) { - return false; - } - return true; -} - /** * Clean up the related resource and determine if we should schedule next iteration. * The function should be wrapped with dispatch_loop->context.lock to retain the dispatch loop while running. * */ -static void end_iteration(struct scheduled_service_entry *entry) { +static void s_end_iteration(struct scheduled_service_entry *entry) { - struct dispatch_loop_context *context = entry->dispatch_queue_context; - struct dispatch_loop *dispatch_loop = context->io_dispatch_loop; + struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context; + struct dispatch_loop *dispatch_loop = dispatch_queue_context->io_dispatch_loop; s_lock_cross_thread_data(dispatch_loop); dispatch_loop->synced_data.is_executing = false; // Remove the node before do scheduling so we didnt consider the entry itself - s_lock_service_entries(context); - aws_priority_queue_remove(&context->scheduling_state.scheduled_services, entry, &entry->priority_queue_node); - s_unlock_service_entries(context); + s_lock_service_entries(dispatch_queue_context); + aws_priority_queue_remove( + &dispatch_queue_context->scheduling_state.scheduled_services, entry, &entry->priority_queue_node); + s_unlock_service_entries(dispatch_queue_context); bool should_schedule = false; uint64_t should_schedule_at_time = 0; @@ -495,26 +461,27 @@ static void end_iteration(struct scheduled_service_entry *entry) { } if (should_schedule) { - s_lock_service_entries(context); - s_try_schedule_new_iteration(context, should_schedule_at_time); - s_unlock_service_entries(context); + s_lock_service_entries(dispatch_queue_context); + s_try_schedule_new_iteration(dispatch_queue_context, should_schedule_at_time); + s_unlock_service_entries(dispatch_queue_context); } s_unlock_cross_thread_data(dispatch_loop); } // Iteration function that scheduled and executed by the Dispatch Queue API -static void s_run_iteration(void *context) { - struct scheduled_service_entry *entry = context; +static void s_run_iteration(void *service_entry) { + struct scheduled_service_entry *entry = service_entry; struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context; s_acquire_dispatch_loop_context(dispatch_queue_context); s_rlock_dispatch_loop_context(dispatch_queue_context); + struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; - if (!begin_iteration(entry)) { + // If the dispatch loop has been cleaned up, we ignore all scheduled tasks and return. + if (dispatch_loop == NULL) { goto iteration_done; } - struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop; // swap the cross-thread tasks into task-local data struct aws_linked_list local_cross_thread_tasks; aws_linked_list_init(&local_cross_thread_tasks); @@ -545,12 +512,19 @@ static void s_run_iteration(void *context) { aws_task_scheduler_run_all(&dispatch_loop->scheduler, now_ns); aws_event_loop_register_tick_end(dispatch_loop->base_loop); - end_iteration(entry); + s_end_iteration(entry); iteration_done: + // destroy the completed service entry. s_lock_service_entries(dispatch_queue_context); - s_scheduled_service_entry_destroy(dispatch_queue_context->scheduling_state, entry); + if (aws_priority_queue_node_is_in_queue(&entry->priority_queue_node)) { + aws_priority_queue_remove(&scheduling_status.scheduled_services, entry, &entry->priority_queue_node); + } + struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context; + s_release_dispatch_loop_context(dispatch_queue_context); + aws_mem_release(entry->allocator, entry); s_unlock_service_entries(dispatch_queue_context); + s_runlock_dispatch_loop_context(dispatch_queue_context); s_release_dispatch_loop_context(dispatch_queue_context); }