diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 478634e43..ea3f9f452 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -71,6 +72,10 @@ struct dispatch_loop { dispatch_queue_t dispatch_queue; struct aws_task_scheduler scheduler; struct aws_linked_list local_cross_thread_tasks; + aws_thread_id_t m_current_thread_id; + bool processing; + // Apple dispatch queue uses the id string to identify the dispatch queue + struct aws_string *dispatch_queue_id; struct { struct dispatch_scheduling_state scheduling_state; @@ -128,6 +133,7 @@ static void s_dispatch_event_loop_destroy(void *context) { AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroy Dispatch Queue Event Loop.", (void *)event_loop); aws_mutex_clean_up(&dispatch_loop->synced_data.lock); + aws_string_destroy(dispatch_loop->dispatch_queue_id); aws_mem_release(dispatch_loop->allocator, dispatch_loop); aws_event_loop_clean_up_base(event_loop); aws_mem_release(event_loop->alloc, event_loop); @@ -135,6 +141,28 @@ static void s_dispatch_event_loop_destroy(void *context) { aws_thread_decrement_unjoined_count(); } +/** Return a aws_string* with unique dispatch queue id string. The id is In format of + * "com.amazonaws.commonruntime.eventloop."*/ +static struct aws_string *s_get_unique_dispatch_queue_id(struct aws_allocator *alloc) { + struct aws_uuid uuid; + AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS); + char uuid_str[AWS_UUID_STR_LEN] = {0}; + struct aws_byte_buf uuid_buf = aws_byte_buf_from_array(uuid_str, sizeof(uuid_str)); + uuid_buf.len = 0; + aws_uuid_to_str(&uuid, &uuid_buf); + struct aws_byte_cursor uuid_cursor = aws_byte_cursor_from_buf(&uuid_buf); + + struct aws_byte_buf dispatch_queue_id_buf; + aws_byte_buf_init_copy_from_cursor( + &dispatch_queue_id_buf, alloc, aws_byte_cursor_from_c_str("com.amazonaws.commonruntime.eventloop.")); + + aws_byte_buf_append_dynamic(&dispatch_queue_id_buf, &uuid_cursor); + + struct aws_string *result = aws_string_new_from_buf(alloc, &dispatch_queue_id_buf); + aws_byte_buf_clean_up(&dispatch_queue_id_buf); + return result; +} + /* Setup a dispatch_queue with a scheduler. */ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options( struct aws_allocator *alloc, @@ -152,8 +180,10 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options( struct dispatch_loop *dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop)); aws_ref_count_init(&dispatch_loop->ref_count, loop, s_dispatch_event_loop_destroy); + dispatch_loop->dispatch_queue_id = s_get_unique_dispatch_queue_id(alloc); + dispatch_loop->dispatch_queue = - dispatch_queue_create("com.amazonaws.commonruntime.eventloop", DISPATCH_QUEUE_SERIAL); + dispatch_queue_create((char *)dispatch_loop->dispatch_queue_id->bytes, DISPATCH_QUEUE_SERIAL); if (!dispatch_loop->dispatch_queue) { AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to create dispatch queue.", (void *)loop); aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); @@ -188,8 +218,7 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options( if (dispatch_loop->dispatch_queue) { dispatch_release(dispatch_loop->dispatch_queue); } - - aws_mem_release(alloc, dispatch_loop); + aws_ref_count_release(&dispatch_loop->ref_count); aws_event_loop_clean_up_base(loop); clean_up_loop: @@ -202,6 +231,8 @@ static void s_destroy(struct aws_event_loop *event_loop) { AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop); struct dispatch_loop *dispatch_loop = event_loop->impl_data; + dispatch_loop->m_current_thread_id = aws_thread_current_thread_id(); + dispatch_loop->processing = true; /* make sure the loop is running so we can schedule a last task. */ s_run(event_loop); @@ -232,10 +263,10 @@ static void s_destroy(struct aws_event_loop *event_loop) { dispatch_loop->synced_data.suspended = true; aws_mutex_unlock(&dispatch_loop->synced_data.lock); - }); - /* we don't want it stopped while shutting down. dispatch_release will fail on a suspended loop. */ - dispatch_release(dispatch_loop->dispatch_queue); + dispatch_loop->m_current_thread_id = aws_thread_current_thread_id(); + dispatch_loop->processing = false; + }); AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop); aws_ref_count_release(&dispatch_loop->ref_count); @@ -367,6 +398,9 @@ void run_iteration(void *context) { } } + dispatch_loop->m_current_thread_id = aws_thread_current_thread_id(); + dispatch_loop->processing = true; + // run all scheduled tasks uint64_t now_ns = 0; aws_event_loop_current_clock_time(event_loop, &now_ns); @@ -374,6 +408,9 @@ void run_iteration(void *context) { aws_event_loop_register_tick_end(event_loop); end_iteration(entry); + + dispatch_loop->m_current_thread_id = aws_thread_current_thread_id(); + dispatch_loop->processing = false; } // Checks if a new iteration task needs to be scheduled, given a target timestamp @@ -412,11 +449,11 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws } } - aws_mutex_unlock(&dispatch_loop->synced_data.lock); - if (should_schedule) { try_schedule_new_iteration(event_loop, 0); } + + aws_mutex_unlock(&dispatch_loop->synced_data.lock); } static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) { @@ -461,6 +498,8 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc // tasks as cross thread tasks. Ignore the caller thread verification for apple // dispatch queue. static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) { - (void)event_loop; - return true; + struct dispatch_loop *dispatch_queue = event_loop->impl_data; + bool result = dispatch_queue->processing && + aws_thread_thread_id_equal(dispatch_queue->m_current_thread_id, aws_thread_current_thread_id()); + return result; }