Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK committed Jan 24, 2025
1 parent 4fadfee commit c605b0b
Showing 1 changed file with 73 additions and 99 deletions.
172 changes: 73 additions & 99 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,61 +59,61 @@ static struct aws_event_loop_vtable s_vtable = {
* 2. Process cross-thread tasks.
* 3. Execute all runnable tasks.
*
* Apple Dispatch queues are FIFO queues to which the application can submit tasks in the form of block objects, and the
* block objects will be executed on a system defined thread pool. Instead of executing the loop on a single thread, we
* tried to recurrently run a single iteration of the execution loop as a dispatch queue block object.
* aws-c-io library use a sequential dispatch queue to make sure the tasks scheduled on the same dispatch queue are
* executed in a strict execution order, though the tasks might be distributed on different threads in the thread pool.
* Apple Dispatch queues can be given a concurrent or serial attribute on creation. We use Serial Dispatch Queues that
* are FIFO queues to which the application can submit tasks in the form of block objects. The block objects will be
* executed on a system defined thread pool. Instead of executing the loop on a single thread, we recurrently run a
* single iteration of the execution loop as a dispatch queue block object. aws-c-io library uses a serial dispatch
* queue to make sure the tasks scheduled on the event loop task scheduler are executed in the correct order.
*
* Data Structures ******
* `dispatch_loop_context`: Context for each execution iteration
* `scheduled_service_entry`: Each entry maps to each iteration we scheduled on system dispatch queue. As we lost
* control of the submitted block on the system dispatch queue, the entry is what we used to track the context and user
* data.
* `scheduled_service_entry`: Each entry maps to an iteration we scheduled on Apple's dispatch queue. As we lose
* control of the submitted block once transferred to Apple's dispatch queue, this entry acquires a refcount on the
* dispatch_loop_context which checks whether it has a non-NULL dispatch_loop to run on.
* `dispatch_loop`: Implementation of the event loop for dispatch queue.
*
* Functions ************
* `s_run_iteration`: The function execute on each single iteration
* `begin_iteration`: Decide if we should run the iteration
* `end_iteration`: Clean up the related resource and determine if we should schedule next iteration
* `s_run_iteration`: This function executes scheduled tasks
* `s_end_iteration`: Clean up the related resource and determine if we should schedule next iteration
*
*/

/* The dispatch_scheduling_state holds required information to schedule a "block" on the dispatch_queue. */
/* Holds required information to schedule a "block" on the dispatch_queue. */
struct dispatch_scheduling_state {

/**
* The lock is used to protect the scheduled_services list cross threads. It should be hold while we add/remove
* entries from the scheduled_services list.
* The lock is used to protect the scheduled_services list cross threads. It must be held when we add or remove
* scheduled_service_entry entries from the scheduled_services list.
*/
struct aws_mutex services_lock;
/**
* priority queue of <scheduled_service_entry> in sorted order by timestamp. Each scheduled_service_entry represents
* a block ALREADY SCHEDULED on apple dispatch queue.
*
* When we go to schedule a new iteration, we check here first to see if our scheduling attempt is redundant.
* When we schedule a new run iteration, scheduled_services is checked to see if the scheduling attempt is
* redundant.
*/
struct aws_priority_queue scheduled_services;
};

/* Internal ref-counted dispatch loop context to processing Apple Dispatch Queue Resources */
struct dispatch_loop_context {
/**
* The conetxt lock is a read-write lock used to protect dispatch_loop.
* The write lock will be acquired when we make changes to dispatch_loop. And the read lock will be acquired
* when we need verify if the dispatch_loop is alive. This makes sure that the dispatch_loop will not be destroyed
* from other thread while we are using it.
* This is a read-write lock used to protect dispatch_loop.
* The write lock needs to be acquired when we make changes to dispatch_loop. T
* The read lock needs to be acquired when we verify whether dispatch_loop is alive.
* This insures dispatch_loop will not be destroyed from other threads while we are using it.
*/
struct aws_allocator *allocator;
struct aws_rw_lock lock;
struct dispatch_loop *io_dispatch_loop;
struct dispatch_scheduling_state scheduling_state;
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
};

/**
* The data structure used to track the dispatch queue execution iteration (block). Each entry associated to an
* iteration scheduled on Apple Dispatch Queue.
* The data structure used to track the dispatch queue execution iteration (block). Each entry is associated with
* an run iteration scheduled on Apple Dispatch Queue.
*/
struct scheduled_service_entry {
struct aws_allocator *allocator;
Expand Down Expand Up @@ -187,21 +187,6 @@ static struct scheduled_service_entry *s_scheduled_service_entry_new(
return entry;
}

/**
* The function should be wrapped around scheduling_status->lock
*/
static void s_scheduled_service_entry_destroy(
struct dispatch_scheduling_state scheduling_status,
struct scheduled_service_entry *entry) {
if (aws_priority_queue_node_is_in_queue(&entry->priority_queue_node)) {
aws_priority_queue_remove(&scheduling_status.scheduled_services, entry, &entry->priority_queue_node);
}
struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context;
s_release_dispatch_loop_context(dispatch_queue_context);

aws_mem_release(entry->allocator, entry);
}

/**
* Helper function to check if another scheduled iteration already exists that will handle our needs
*
Expand All @@ -222,7 +207,7 @@ static bool s_should_schedule_iteration(
return (*entry)->timestamp > proposed_iteration_time;
}

/* On dispatch event loop context ref-count reaches 0 */
/* Called when dispatch_loop_context ref-count reaches 0 */
static void s_dispatch_loop_context_destroy(void *context) {
struct dispatch_loop_context *dispatch_loop_context = context;
aws_priority_queue_clean_up(&dispatch_loop_context->scheduling_state.scheduled_services);
Expand All @@ -231,10 +216,9 @@ static void s_dispatch_loop_context_destroy(void *context) {
aws_mem_release(dispatch_loop_context->allocator, dispatch_loop_context);
}

/* On dispatch event loop ref-count reaches 0 */
static void s_dispatch_event_loop_destroy(void *context) {
// Manually called to destroy an aws_event_loop
static void s_dispatch_event_loop_destroy(aws_event_loop *event_loop) {
// release dispatch loop
struct aws_event_loop *event_loop = context;
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

if (dispatch_loop->context) {
Expand All @@ -245,8 +229,8 @@ static void s_dispatch_event_loop_destroy(void *context) {
s_release_dispatch_loop_context(dispatch_loop->context);
}

// The scheduler should be cleaned up and zero out in event loop destroy task. Double check here in case the destroy
// function is not called or initialize was failed.
// The scheduler should be cleaned up and zeroed out in s_dispatch_queue_destroy_task.
// Double-check here in case the destroy function is not called or event loop initialization failed.
if (aws_task_scheduler_is_valid(&dispatch_loop->scheduler)) {
aws_task_scheduler_clean_up(&dispatch_loop->scheduler);
}
Expand Down Expand Up @@ -288,75 +272,71 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
AWS_PRECONDITION(options->clock);

struct aws_event_loop *loop = aws_mem_calloc(alloc, 1, sizeof(struct aws_event_loop));
struct dispatch_loop *dispatch_loop = NULL;

AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing dispatch_queue event-loop", (void *)loop);
if (aws_event_loop_init_base(loop, alloc, options->clock)) {
goto clean_up;
}

dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop));
loop->vtable = &s_vtable;

struct dispatch_loop *dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop));
dispatch_loop->allocator = alloc;
loop->impl_data = dispatch_loop;
dispatch_loop->base_loop = loop;

char dispatch_queue_id[AWS_IO_APPLE_DISPATCH_QUEUE_ID_LENGTH] = {0};
s_get_unique_dispatch_queue_id(dispatch_queue_id);

/*
* Apple API dispatch_queue_create returns a dispatch_queue_t. This cannot fail and will crash if it does.
* A reference to the dispatch queue is retained and must be released explicitly with dispatch_release().
*/
dispatch_loop->dispatch_queue = dispatch_queue_create(dispatch_queue_id, DISPATCH_QUEUE_SERIAL);
if (!dispatch_loop->dispatch_queue) {
AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to create dispatch queue.", (void *)loop);
aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
goto clean_up;
}

AWS_LOGF_INFO(
AWS_LS_IO_EVENT_LOOP, "id=%p: Apple dispatch queue created with id: %s", (void *)loop, dispatch_queue_id);

aws_mutex_init(&dispatch_loop->synced_data.lock);
dispatch_loop->synced_data.is_executing = false;

int err = aws_task_scheduler_init(&dispatch_loop->scheduler, alloc);
if (err) {
AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing task scheduler failed", (void *)loop);
if (aws_task_scheduler_init(&dispatch_loop->scheduler, alloc)) {
AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: Initialization of task scheduler failed", (void *)loop);
goto clean_up;
}

aws_linked_list_init(&dispatch_loop->synced_data.cross_thread_tasks);

struct dispatch_loop_context *context = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context));
struct dispatch_loop_context *dispatch_loop_context =
aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop_context));
aws_ref_count_init(&context->ref_count, dispatch_loop_context, s_dispatch_loop_context_destroy);
dispatch_loop_context->allocator = alloc;
dispatch_loop->context = dispatch_loop_context;
aws_rw_lock_init(&dispatch_loop_context->lock);
dispatch_loop_context->io_dispatch_loop = dispatch_loop;

aws_mutex_init(&dispatch_loop_context->scheduling_state.services_lock);
if (aws_priority_queue_init_dynamic(
&context->scheduling_state.scheduled_services,
&dispatch_loop_context->scheduling_state.scheduled_services,
alloc,
DEFAULT_QUEUE_SIZE,
sizeof(struct scheduled_service_entry *),
&s_compare_timestamps)) {
AWS_LOGF_INFO(
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: priority queue creation failed, clean up the context: %s",
"id=%p: Priority queue creation failed, cleaning up the dispatch queue: %s",
(void *)loop,
dispatch_queue_id);
aws_mem_release(alloc, context);
aws_mem_release(alloc, dispatch_loop_context);
goto clean_up;
};

aws_ref_count_init(&context->ref_count, context, s_dispatch_loop_context_destroy);
context->allocator = alloc;

aws_mutex_init(&context->scheduling_state.services_lock);

aws_rw_lock_init(&context->lock);
context->io_dispatch_loop = dispatch_loop;
dispatch_loop->context = context;

loop->vtable = &s_vtable;

return loop;

clean_up:
if (dispatch_loop) {
if (dispatch_loop->dispatch_queue) {
/* Apple API for releasing reference count on a dispatch object. */
dispatch_release(dispatch_loop->dispatch_queue);
}
s_dispatch_event_loop_destroy(loop);
Expand Down Expand Up @@ -452,37 +432,23 @@ static int s_stop(struct aws_event_loop *event_loop) {
return AWS_OP_SUCCESS;
}

/**
* The function decides if we should run this iteration.
* Returns true if we should execute an iteration, false otherwise
*
* The function should be wrapped with dispatch_loop->context.lock to retain the dispatch loop while running.
*/
static bool begin_iteration(struct scheduled_service_entry *entry) {
struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;

if (!dispatch_loop) {
return false;
}
return true;
}

/**
* Clean up the related resource and determine if we should schedule next iteration.
* The function should be wrapped with dispatch_loop->context.lock to retain the dispatch loop while running.
* */
static void end_iteration(struct scheduled_service_entry *entry) {
static void s_end_iteration(struct scheduled_service_entry *entry) {

struct dispatch_loop_context *context = entry->dispatch_queue_context;
struct dispatch_loop *dispatch_loop = context->io_dispatch_loop;
struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context;
struct dispatch_loop *dispatch_loop = dispatch_queue_context->io_dispatch_loop;

s_lock_cross_thread_data(dispatch_loop);
dispatch_loop->synced_data.is_executing = false;

// Remove the node before do scheduling so we didnt consider the entry itself
s_lock_service_entries(context);
aws_priority_queue_remove(&context->scheduling_state.scheduled_services, entry, &entry->priority_queue_node);
s_unlock_service_entries(context);
s_lock_service_entries(dispatch_queue_context);
aws_priority_queue_remove(
&dispatch_queue_context->scheduling_state.scheduled_services, entry, &entry->priority_queue_node);
s_unlock_service_entries(dispatch_queue_context);

bool should_schedule = false;
uint64_t should_schedule_at_time = 0;
Expand All @@ -495,26 +461,27 @@ static void end_iteration(struct scheduled_service_entry *entry) {
}

if (should_schedule) {
s_lock_service_entries(context);
s_try_schedule_new_iteration(context, should_schedule_at_time);
s_unlock_service_entries(context);
s_lock_service_entries(dispatch_queue_context);
s_try_schedule_new_iteration(dispatch_queue_context, should_schedule_at_time);
s_unlock_service_entries(dispatch_queue_context);
}

s_unlock_cross_thread_data(dispatch_loop);
}

// Iteration function that scheduled and executed by the Dispatch Queue API
static void s_run_iteration(void *context) {
struct scheduled_service_entry *entry = context;
static void s_run_iteration(void *service_entry) {
struct scheduled_service_entry *entry = service_entry;
struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context;
s_acquire_dispatch_loop_context(dispatch_queue_context);
s_rlock_dispatch_loop_context(dispatch_queue_context);
struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;

if (!begin_iteration(entry)) {
// If the dispatch loop has been cleaned up, we ignore all scheduled tasks and return.
if (dispatch_loop == NULL) {
goto iteration_done;
}

struct dispatch_loop *dispatch_loop = entry->dispatch_queue_context->io_dispatch_loop;
// swap the cross-thread tasks into task-local data
struct aws_linked_list local_cross_thread_tasks;
aws_linked_list_init(&local_cross_thread_tasks);
Expand Down Expand Up @@ -545,12 +512,19 @@ static void s_run_iteration(void *context) {
aws_task_scheduler_run_all(&dispatch_loop->scheduler, now_ns);
aws_event_loop_register_tick_end(dispatch_loop->base_loop);

end_iteration(entry);
s_end_iteration(entry);

iteration_done:
// destroy the completed service entry.
s_lock_service_entries(dispatch_queue_context);
s_scheduled_service_entry_destroy(dispatch_queue_context->scheduling_state, entry);
if (aws_priority_queue_node_is_in_queue(&entry->priority_queue_node)) {
aws_priority_queue_remove(&scheduling_status.scheduled_services, entry, &entry->priority_queue_node);
}
struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context;
s_release_dispatch_loop_context(dispatch_queue_context);
aws_mem_release(entry->allocator, entry);
s_unlock_service_entries(dispatch_queue_context);

s_runlock_dispatch_loop_context(dispatch_queue_context);
s_release_dispatch_loop_context(dispatch_queue_context);
}
Expand Down

0 comments on commit c605b0b

Please sign in to comment.