Skip to content

Commit

Permalink
Use atomic for event loop state
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Sep 26, 2024
1 parent 1700c76 commit 88aff4d
Showing 1 changed file with 93 additions and 71 deletions.
164 changes: 93 additions & 71 deletions source/qnx/ionotify_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct aws_ionotify_event_loop {
struct aws_thread_options thread_options;
aws_thread_id_t thread_joined_to;
struct aws_atomic_var running_thread_id;
enum aws_ionotify_event_loop_state event_loop_state;
struct aws_atomic_var event_loop_state;

/* Channel to receive I/O events. Resource managers open connections to this channel to send their events. */
int io_events_channel_id;
Expand Down Expand Up @@ -119,80 +119,29 @@ static short CROSS_THREAD_PULSE_SIGEV_CODE = _PULSE_CODE_MINAVAIL;
static short IO_EVENT_KICKSTART_SIGEV_CODE = _PULSE_CODE_MINAVAIL + 1;
static short IO_EVENT_UPDATE_ERROR_SIGEV_CODE = _PULSE_CODE_MINAVAIL + 2;

static void s_destroy(struct aws_event_loop *event_loop) {
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying event_loop", (void *)event_loop);

struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data;

/* FIXME Data race */
if (ionotify_event_loop->event_loop_state == AIELS_LOOP_STARTED) {
/* we don't know if stop() has been called by someone else,
* just call stop() again and wait for event-loop to finish. */
aws_event_loop_stop(event_loop);
s_wait_for_stop_completion(event_loop);
}

aws_hash_table_clean_up(&ionotify_event_loop->handles);

if (aws_task_scheduler_is_valid(&ionotify_event_loop->scheduler)) {
/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
ionotify_event_loop->thread_joined_to = aws_thread_current_thread_id();
aws_atomic_store_ptr(&ionotify_event_loop->running_thread_id, &ionotify_event_loop->thread_joined_to);
aws_task_scheduler_clean_up(&ionotify_event_loop->scheduler);

while (!aws_linked_list_empty(&ionotify_event_loop->task_pre_queue)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&ionotify_event_loop->task_pre_queue);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}
}

if (ionotify_event_loop->pulse_connection_id != 0 && ionotify_event_loop->pulse_connection_id != -1) {
int rc = ConnectDetach(ionotify_event_loop->pulse_connection_id);
int errno_value = errno;
if (rc == -1) {
AWS_LOGF_WARN(
AWS_LS_IO_EVENT_LOOP, "id=%p: ConnectDetach failed with errno %d", (void *)event_loop, errno_value);
}
}

if (ionotify_event_loop->io_events_channel_id != 0 && ionotify_event_loop->io_events_channel_id != -1) {
int rc = ChannelDestroy(ionotify_event_loop->io_events_channel_id);
int errno_value = errno;
if (rc == -1) {
AWS_LOGF_WARN(
AWS_LS_IO_EVENT_LOOP, "id=%p: ChannelDestroy failed with errno %d", (void *)event_loop, errno_value);
}
}

aws_thread_clean_up(&ionotify_event_loop->thread_created_on);

if (ionotify_event_loop->event_loop_state != AIELS_BASE_UNINITIALIZED) {
aws_event_loop_clean_up_base(event_loop);
}

aws_mem_release(ionotify_event_loop->allocator, ionotify_event_loop);
}
static void s_destroy_ionotify_event_loop(struct aws_ionotify_event_loop *ionotify_event_loop);

/* Setup edge triggered ionotify with a scheduler. */
struct aws_event_loop *aws_event_loop_new_default_with_options(
struct aws_allocator *alloc,
struct aws_allocator *allocator,
const struct aws_event_loop_options *options) {
AWS_PRECONDITION(options);
AWS_PRECONDITION(options->clock);

struct aws_ionotify_event_loop *ionotify_event_loop =
aws_mem_calloc(alloc, 1, sizeof(struct aws_ionotify_event_loop));
aws_mem_calloc(allocator, 1, sizeof(struct aws_ionotify_event_loop));
ionotify_event_loop->allocator = allocator;
struct aws_event_loop *base_event_loop = &ionotify_event_loop->base;

ionotify_event_loop->event_loop_state = AIELS_BASE_UNINITIALIZED;
aws_atomic_store_int_explicit(
&ionotify_event_loop->event_loop_state, AIELS_BASE_UNINITIALIZED, aws_memory_order_relaxed);

AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered ionotify", (void *)base_event_loop);
if (aws_event_loop_init_base(&ionotify_event_loop->base, alloc, options->clock)) {
if (aws_event_loop_init_base(&ionotify_event_loop->base, allocator, options->clock)) {
goto error;
}

ionotify_event_loop->event_loop_state = AIELS_LOOP_STOPPED;
aws_atomic_store_int_explicit(&ionotify_event_loop->event_loop_state, AIELS_LOOP_STOPPED, aws_memory_order_relaxed);

if (options->thread_options) {
ionotify_event_loop->thread_options = *options->thread_options;
Expand All @@ -207,7 +156,7 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
ionotify_event_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT;
aws_atomic_init_ptr(&ionotify_event_loop->stop_task_ptr, NULL);

if (aws_thread_init(&ionotify_event_loop->thread_created_on, alloc)) {
if (aws_thread_init(&ionotify_event_loop->thread_created_on, allocator)) {
goto error;
}

Expand All @@ -216,11 +165,7 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
int errno_value = errno; /* Always cache errno before potential side-effect */
if (ionotify_event_loop->io_events_channel_id == -1) {
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: ChannelCreate failed with errno %d\n",
(void *)base_event_loop,
errno_value,
strerror(errno_value));
AWS_LS_IO_EVENT_LOOP, "id=%p: ChannelCreate failed with errno %d\n", (void *)base_event_loop, errno_value);
goto error;
}
AWS_LOGF_DEBUG(
Expand All @@ -244,12 +189,12 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
goto error;
}

if (aws_task_scheduler_init(&ionotify_event_loop->scheduler, alloc)) {
if (aws_task_scheduler_init(&ionotify_event_loop->scheduler, allocator)) {
AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: aws_task_scheduler_init failed\n", (void *)base_event_loop);
goto error;
}

if (aws_hash_table_init(&ionotify_event_loop->handles, alloc, 32, aws_hash_ptr, aws_ptr_eq, NULL, NULL)) {
if (aws_hash_table_init(&ionotify_event_loop->handles, allocator, 32, aws_hash_ptr, aws_ptr_eq, NULL, NULL)) {
goto error;
}

Expand All @@ -261,13 +206,90 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
return &ionotify_event_loop->base;

error:
s_destroy(&ionotify_event_loop->base);
s_destroy_ionotify_event_loop(ionotify_event_loop);
return NULL;
}

static void s_destroy_ionotify_event_loop(struct aws_ionotify_event_loop *ionotify_event_loop) {
struct aws_event_loop *base_event_loop = &ionotify_event_loop->base;
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying event_loop", (void *)base_event_loop);

int event_loop_state = (int)aws_atomic_load_int(&ionotify_event_loop->event_loop_state);

if (event_loop_state == AIELS_LOOP_STARTED) {
/* we don't know if stop() has been called by someone else,
* just call stop() again and wait for event-loop to finish. */
aws_event_loop_stop(base_event_loop);
s_wait_for_stop_completion(base_event_loop);
}

if (aws_hash_table_is_valid(&ionotify_event_loop->handles)) {
aws_hash_table_clean_up(&ionotify_event_loop->handles);
}

if (aws_task_scheduler_is_valid(&ionotify_event_loop->scheduler)) {
/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
ionotify_event_loop->thread_joined_to = aws_thread_current_thread_id();
aws_atomic_store_ptr(&ionotify_event_loop->running_thread_id, &ionotify_event_loop->thread_joined_to);
aws_task_scheduler_clean_up(&ionotify_event_loop->scheduler);

while (!aws_linked_list_empty(&ionotify_event_loop->task_pre_queue)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&ionotify_event_loop->task_pre_queue);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}
}

if (ionotify_event_loop->pulse_connection_id != 0 && ionotify_event_loop->pulse_connection_id != -1) {
int rc = ConnectDetach(ionotify_event_loop->pulse_connection_id);
int errno_value = errno;
if (rc == -1) {
AWS_LOGF_WARN(
AWS_LS_IO_EVENT_LOOP,
"id=%p: ConnectDetach failed with errno %d",
(void *)base_event_loop,
errno_value);
}
}

if (ionotify_event_loop->io_events_channel_id != 0 && ionotify_event_loop->io_events_channel_id != -1) {
int rc = ChannelDestroy(ionotify_event_loop->io_events_channel_id);
int errno_value = errno;
if (rc == -1) {
AWS_LOGF_WARN(
AWS_LS_IO_EVENT_LOOP,
"id=%p: ChannelDestroy failed with errno %d",
(void *)base_event_loop,
errno_value);
}
}

aws_thread_clean_up(&ionotify_event_loop->thread_created_on);

if (event_loop_state != AIELS_BASE_UNINITIALIZED) {
aws_event_loop_clean_up_base(base_event_loop);
}

aws_mem_release(ionotify_event_loop->allocator, ionotify_event_loop);
}

static void s_destroy(struct aws_event_loop *event_loop) {
s_destroy_ionotify_event_loop(event_loop->impl_data);
}

static int s_run(struct aws_event_loop *event_loop) {
struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data;

int current_state = (int)aws_atomic_load_int(&ionotify_event_loop->event_loop_state);
if (current_state != AIELS_LOOP_STOPPED) {
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: Failed to start event-loop thread: event loop state is %d",
(void *)event_loop,
current_state);
return AWS_OP_ERR;
}

AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);

ionotify_event_loop->should_continue = true;
Expand All @@ -284,7 +306,7 @@ static int s_run(struct aws_event_loop *event_loop) {
return AWS_OP_ERR;
}

ionotify_event_loop->event_loop_state = AIELS_LOOP_STARTED;
aws_atomic_store_int(&ionotify_event_loop->event_loop_state, AIELS_LOOP_STARTED);

return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -323,7 +345,7 @@ static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data;
int result = aws_thread_join(&ionotify_event_loop->thread_created_on);
aws_thread_decrement_unjoined_count();
ionotify_event_loop->event_loop_state = AIELS_LOOP_STOPPED;
aws_atomic_store_int(&ionotify_event_loop->event_loop_state, AIELS_LOOP_STOPPED);
return result;
}

Expand Down

0 comments on commit 88aff4d

Please sign in to comment.