Skip to content

Commit

Permalink
improve dispatch caller's thread check
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Sep 26, 2024
1 parent 5ab8f24 commit 0918e76
Showing 1 changed file with 49 additions and 10 deletions.
59 changes: 49 additions & 10 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <aws/common/atomics.h>
#include <aws/common/mutex.h>
#include <aws/common/task_scheduler.h>
#include <aws/common/uuid.h>

#include <aws/io/logging.h>

Expand Down Expand Up @@ -71,6 +72,10 @@ struct dispatch_loop {
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;
aws_thread_id_t m_current_thread_id;
bool processing;
// Apple dispatch queue uses the id string to identify the dispatch queue
struct aws_string *dispatch_queue_id;

struct {
struct dispatch_scheduling_state scheduling_state;
Expand Down Expand Up @@ -128,13 +133,36 @@ static void s_dispatch_event_loop_destroy(void *context) {
AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroy Dispatch Queue Event Loop.", (void *)event_loop);

aws_mutex_clean_up(&dispatch_loop->synced_data.lock);
aws_string_destroy(dispatch_loop->dispatch_queue_id);
aws_mem_release(dispatch_loop->allocator, dispatch_loop);
aws_event_loop_clean_up_base(event_loop);
aws_mem_release(event_loop->alloc, event_loop);

aws_thread_decrement_unjoined_count();
}

/** Return a aws_string* with unique dispatch queue id string. The id is In format of
* "com.amazonaws.commonruntime.eventloop.<UUID>"*/
static struct aws_string *s_get_unique_dispatch_queue_id(struct aws_allocator *alloc) {
struct aws_uuid uuid;
AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS);
char uuid_str[AWS_UUID_STR_LEN] = {0};
struct aws_byte_buf uuid_buf = aws_byte_buf_from_array(uuid_str, sizeof(uuid_str));
uuid_buf.len = 0;
aws_uuid_to_str(&uuid, &uuid_buf);
struct aws_byte_cursor uuid_cursor = aws_byte_cursor_from_buf(&uuid_buf);

struct aws_byte_buf dispatch_queue_id_buf;
aws_byte_buf_init_copy_from_cursor(
&dispatch_queue_id_buf, alloc, aws_byte_cursor_from_c_str("com.amazonaws.commonruntime.eventloop."));

aws_byte_buf_append_dynamic(&dispatch_queue_id_buf, &uuid_cursor);

struct aws_string *result = aws_string_new_from_buf(alloc, &dispatch_queue_id_buf);
aws_byte_buf_clean_up(&dispatch_queue_id_buf);
return result;
}

/* Setup a dispatch_queue with a scheduler. */
struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(
struct aws_allocator *alloc,
Expand All @@ -152,8 +180,10 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(
struct dispatch_loop *dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop));
aws_ref_count_init(&dispatch_loop->ref_count, loop, s_dispatch_event_loop_destroy);

dispatch_loop->dispatch_queue_id = s_get_unique_dispatch_queue_id(alloc);

dispatch_loop->dispatch_queue =
dispatch_queue_create("com.amazonaws.commonruntime.eventloop", DISPATCH_QUEUE_SERIAL);
dispatch_queue_create((char *)dispatch_loop->dispatch_queue_id->bytes, DISPATCH_QUEUE_SERIAL);
if (!dispatch_loop->dispatch_queue) {
AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to create dispatch queue.", (void *)loop);
aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
Expand Down Expand Up @@ -188,8 +218,7 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(
if (dispatch_loop->dispatch_queue) {
dispatch_release(dispatch_loop->dispatch_queue);
}

aws_mem_release(alloc, dispatch_loop);
aws_ref_count_release(&dispatch_loop->ref_count);
aws_event_loop_clean_up_base(loop);

clean_up_loop:
Expand All @@ -202,6 +231,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop);

struct dispatch_loop *dispatch_loop = event_loop->impl_data;
dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = true;

/* make sure the loop is running so we can schedule a last task. */
s_run(event_loop);
Expand Down Expand Up @@ -232,10 +263,10 @@ static void s_destroy(struct aws_event_loop *event_loop) {

dispatch_loop->synced_data.suspended = true;
aws_mutex_unlock(&dispatch_loop->synced_data.lock);
});

/* we don't want it stopped while shutting down. dispatch_release will fail on a suspended loop. */
dispatch_release(dispatch_loop->dispatch_queue);
dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = false;
});

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop);
aws_ref_count_release(&dispatch_loop->ref_count);
Expand Down Expand Up @@ -367,13 +398,19 @@ void run_iteration(void *context) {
}
}

dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = true;

// run all scheduled tasks
uint64_t now_ns = 0;
aws_event_loop_current_clock_time(event_loop, &now_ns);
aws_task_scheduler_run_all(&dispatch_loop->scheduler, now_ns);
aws_event_loop_register_tick_end(event_loop);

end_iteration(entry);

dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = false;
}

// Checks if a new iteration task needs to be scheduled, given a target timestamp
Expand Down Expand Up @@ -412,11 +449,11 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
}
}

aws_mutex_unlock(&dispatch_loop->synced_data.lock);

if (should_schedule) {
try_schedule_new_iteration(event_loop, 0);
}

aws_mutex_unlock(&dispatch_loop->synced_data.lock);
}

static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
Expand Down Expand Up @@ -461,6 +498,8 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
// tasks as cross thread tasks. Ignore the caller thread verification for apple
// dispatch queue.
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
(void)event_loop;
return true;
struct dispatch_loop *dispatch_queue = event_loop->impl_data;
bool result = dispatch_queue->processing &&
aws_thread_thread_id_equal(dispatch_queue->m_current_thread_id, aws_thread_current_thread_id());
return result;
}

0 comments on commit 0918e76

Please sign in to comment.