Skip to content

Commit

Permalink
Merge branch 'grand_dispatch_queue' of https://github.com/awslabs/aws…
Browse files Browse the repository at this point in the history
…-c-io into nw_socket_pull_dispatch_queue
  • Loading branch information
xiazhvera committed Sep 26, 2024
2 parents acc5be1 + 06fb206 commit 44d17eb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
10 changes: 6 additions & 4 deletions include/aws/io/private/dispatch_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,18 @@ struct dispatch_loop {
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;
aws_thread_id_t m_current_thread_id;
bool processing;

// Apple dispatch queue uses the id string to identify the dispatch queue
struct aws_string* dispatch_queue_id;
struct aws_string *dispatch_queue_id;

struct {
struct dispatch_scheduling_state scheduling_state;
struct aws_linked_list cross_thread_tasks;
struct aws_mutex lock;
bool suspended;
// `is_executing` flag and `current_thread_id` together are used to identify the excuting thread id for dispatch queue.
bool is_executing;
aws_thread_id_t current_thread_id;
} synced_data;

bool wakeup_schedule_needed;
Expand Down
29 changes: 17 additions & 12 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ static void s_destroy(struct aws_event_loop *event_loop) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop);

struct dispatch_loop *dispatch_loop = event_loop->impl_data;
dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = true;

/* make sure the loop is running so we can schedule a last task. */
s_run(event_loop);
Expand All @@ -238,6 +236,9 @@ static void s_destroy(struct aws_event_loop *event_loop) {
aws_task_scheduler_clean_up(&dispatch_loop->scheduler);

aws_mutex_lock(&dispatch_loop->synced_data.lock);
dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id();
dispatch_loop->synced_data.is_executing = true;
aws_mutex_unlock(&dispatch_loop->synced_data.lock);
while (!aws_linked_list_empty(&dispatch_loop->synced_data.cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&dispatch_loop->synced_data.cross_thread_tasks);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
Expand All @@ -257,11 +258,10 @@ static void s_destroy(struct aws_event_loop *event_loop) {
scheduled_service_entry_destroy(entry);
}

aws_mutex_lock(&dispatch_loop->synced_data.lock);
dispatch_loop->synced_data.suspended = true;
dispatch_loop->synced_data.is_executing = false;
aws_mutex_unlock(&dispatch_loop->synced_data.lock);

dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = false;
});

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop);
Expand Down Expand Up @@ -400,19 +400,22 @@ void run_iteration(void *context) {
}
}

dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = true;
aws_mutex_lock(&dispatch_loop->synced_data.lock);
dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id();
dispatch_loop->synced_data.is_executing = true;
aws_mutex_unlock(&dispatch_loop->synced_data.lock);

// run all scheduled tasks
uint64_t now_ns = 0;
aws_event_loop_current_clock_time(event_loop, &now_ns);
aws_task_scheduler_run_all(&dispatch_loop->scheduler, now_ns);
aws_event_loop_register_tick_end(event_loop);

end_iteration(entry);
aws_mutex_lock(&dispatch_loop->synced_data.lock);
dispatch_loop->synced_data.is_executing = false;
aws_mutex_unlock(&dispatch_loop->synced_data.lock);

dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = false;
end_iteration(entry);
}

// Checks if a new iteration task needs to be scheduled, given a target timestamp
Expand Down Expand Up @@ -501,7 +504,9 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
// dispatch queue.
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
struct dispatch_loop *dispatch_queue = event_loop->impl_data;
bool result = dispatch_queue->processing &&
aws_thread_thread_id_equal(dispatch_queue->m_current_thread_id, aws_thread_current_thread_id());
aws_mutex_lock(&dispatch_queue->synced_data.lock);
bool result = dispatch_queue->synced_data.is_executing &&
aws_thread_thread_id_equal(dispatch_queue->synced_data.current_thread_id, aws_thread_current_thread_id());
aws_mutex_unlock(&dispatch_queue->synced_data.lock);
return result;
}

0 comments on commit 44d17eb

Please sign in to comment.