Skip to content

Commit

Permalink
clean up dispatch queue changes
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Sep 26, 2024
1 parent 5b515cb commit 435a70d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 59 deletions.
36 changes: 0 additions & 36 deletions include/aws/io/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,42 +119,6 @@ struct aws_event_loop {
void *impl_data;
};

#ifdef AWS_USE_DISPATCH_QUEUE
# include <dispatch/dispatch.h>
# include <dispatch/queue.h>

struct dispatch_scheduling_state {
// Let's us skip processing an iteration task if one is already in the middle
// of executing
bool is_executing_iteration;

// List<scheduled_service_entry> in sorted order by timestamp
//
// When we go to schedule a new iteration, we check here first to see
// if our scheduling attempt is redundant
struct aws_linked_list scheduled_services;
};

struct dispatch_loop {
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;
aws_thread_id_t m_current_thread_id;
bool processing;

struct {
struct dispatch_scheduling_state scheduling_state;
struct aws_linked_list cross_thread_tasks;
struct aws_mutex lock;
bool suspended;
} synced_data;

bool wakeup_schedule_needed;
};
#endif

struct aws_event_loop_local_object;
typedef void(aws_event_loop_on_local_object_removed_fn)(struct aws_event_loop_local_object *);

Expand Down
6 changes: 5 additions & 1 deletion include/aws/io/private/dispatch_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ struct dispatch_loop {
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;

aws_thread_id_t m_current_thread_id;
bool processing;
// Apple dispatch queue uses the id string to identify the dispatch queue
struct aws_string* dispatch_queue_id;

struct {
struct dispatch_scheduling_state scheduling_state;
struct aws_linked_list cross_thread_tasks;
Expand Down
47 changes: 27 additions & 20 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,36 @@ static void s_dispatch_event_loop_destroy(void *context) {
AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroy Dispatch Queue Event Loop.", (void *)event_loop);

aws_mutex_clean_up(&dispatch_loop->synced_data.lock);
aws_string_destroy(dispatch_loop->dispatch_queue_id);
aws_mem_release(dispatch_loop->allocator, dispatch_loop);
aws_event_loop_clean_up_base(event_loop);
aws_mem_release(event_loop->alloc, event_loop);

aws_thread_decrement_unjoined_count();
}


/** Return a aws_string* with unique dispatch queue id string. The id is In format of "com.amazonaws.commonruntime.eventloop.<UUID>"*/
static struct aws_string* s_get_unique_dispatch_queue_id(struct aws_allocator* alloc)
{
struct aws_uuid uuid;
AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS);
char uuid_str[AWS_UUID_STR_LEN] = {0};
struct aws_byte_buf uuid_buf = aws_byte_buf_from_array(uuid_str, sizeof(uuid_str));
uuid_buf.len = 0;
aws_uuid_to_str(&uuid, &uuid_buf);
struct aws_byte_cursor uuid_cursor = aws_byte_cursor_from_buf(&uuid_buf);

struct aws_byte_buf dispatch_queue_id_buf;
aws_byte_buf_init_copy_from_cursor(&dispatch_queue_id_buf, alloc, aws_byte_cursor_from_c_str("com.amazonaws.commonruntime.eventloop."));

aws_byte_buf_append_dynamic(&dispatch_queue_id_buf, &uuid_cursor);

struct aws_string* result = aws_string_new_from_buf(alloc, &dispatch_queue_id_buf);
aws_byte_buf_clean_up(&dispatch_queue_id_buf);
return result;
}

/* Setup a dispatch_queue with a scheduler. */
struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(
struct aws_allocator *alloc,
Expand All @@ -154,24 +177,9 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(
struct dispatch_loop *dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct dispatch_loop));
aws_ref_count_init(&dispatch_loop->ref_count, loop, s_dispatch_event_loop_destroy);

struct aws_uuid uuid;
AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS);

char uuid_str[AWS_UUID_STR_LEN] = {0};
struct aws_byte_buf uuid_buf = aws_byte_buf_from_array(uuid_str, sizeof(uuid_str));
uuid_buf.len = 0;
aws_uuid_to_str(&uuid, &uuid_buf);
struct aws_byte_cursor uuid_cursor = aws_byte_cursor_from_buf(&uuid_buf);

AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "[DEBUG DISPATCH QUEUE uuID ] : %s", uuid_cursor.ptr);

struct aws_byte_buf dispatch_queue_id = aws_byte_buf_from_c_str("com.amazonaws.commonruntime.eventloop.");
dispatch_queue_id.allocator = alloc;
// aws_byte_buf_append_dynamic(&dispatch_queue_id, &uuid_cursor);

AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "[DEBUG DISPATCH QUEUE ID ] : %s", dispatch_queue_id.buffer);

dispatch_loop->dispatch_queue = dispatch_queue_create((char *)uuid_cursor.ptr, DISPATCH_QUEUE_SERIAL);
dispatch_loop->dispatch_queue_id = s_get_unique_dispatch_queue_id(alloc);

dispatch_loop->dispatch_queue = dispatch_queue_create((char *)dispatch_loop->dispatch_queue_id->bytes, DISPATCH_QUEUE_SERIAL);
if (!dispatch_loop->dispatch_queue) {
AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to create dispatch queue.", (void *)loop);
aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
Expand Down Expand Up @@ -206,8 +214,7 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(
if (dispatch_loop->dispatch_queue) {
dispatch_release(dispatch_loop->dispatch_queue);
}

aws_mem_release(alloc, dispatch_loop);
aws_ref_count_release(&dispatch_loop->ref_count);
aws_event_loop_clean_up_base(loop);

clean_up_loop:
Expand Down
3 changes: 2 additions & 1 deletion source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ static int s_socket_connect_fn(
* we uninstall this handler right before calling close on the socket so this shouldn't
* get hit unless it was triggered remotely */
// Cancel the connection timeout task
s_schedule_cancel_task(socket, &nw_socket->connect_args->task);
s_schedule_cancel_task(socket, &nw_socket->timeout_args->task);
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: socket closed remotely.",
Expand Down Expand Up @@ -1259,6 +1259,7 @@ static int s_socket_write_fn(

struct nw_socket *nw_socket = socket->impl;
aws_ref_count_acquire(&nw_socket->ref_count);
nw_connection_t handle = nw_socket->nw_connection;

AWS_ASSERT(written_fn);

Expand Down
2 changes: 1 addition & 1 deletion source/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ void aws_event_loop_destroy(struct aws_event_loop *event_loop) {
}

AWS_ASSERT(event_loop->vtable && event_loop->vtable->destroy);
AWS_EVENT_LOOP_NOT_CALLER_THREAD(event_loop);
AWS_ASSERT(!aws_event_loop_thread_is_callers_thread(event_loop));

event_loop->vtable->destroy(event_loop);
}
Expand Down

0 comments on commit 435a70d

Please sign in to comment.