diff --git a/CMakeLists.txt b/CMakeLists.txt index 707d60d7f..b4dbb8311 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -118,6 +118,7 @@ elseif (APPLE) file(GLOB AWS_IO_OS_SRC "source/bsd/*.c" "source/posix/*.c" + "source/darwin/nw_socket.c" "source/darwin/darwin_pki_utils.c" "source/darwin/secure_transport_tls_channel_handler.c" ) diff --git a/include/aws/io/private/dispatch_queue.h b/include/aws/io/private/dispatch_queue.h index 6bd2a2026..76175a041 100644 --- a/include/aws/io/private/dispatch_queue.h +++ b/include/aws/io/private/dispatch_queue.h @@ -6,6 +6,8 @@ */ #include +#include +#include #include #include diff --git a/source/darwin/nw_socket.c b/source/darwin/nw_socket.c index acdd0409d..bbbb71f99 100644 --- a/source/darwin/nw_socket.c +++ b/source/darwin/nw_socket.c @@ -81,6 +81,27 @@ struct nw_socket_timeout_args { struct aws_socket *socket; }; +struct nw_socket_readable_args { + int error_code; + struct aws_allocator *allocator; + struct aws_socket *socket; +}; + +struct nw_socket_written_args { + int error_code; + struct aws_allocator *allocator; + struct aws_socket *socket; + aws_socket_on_write_completed_fn *written_fn; + void *user_data; + size_t bytes_written; +}; + +struct nw_socket_cancel_task_args { + struct aws_allocator *allocator; + struct aws_socket *socket; + struct aws_task *task_to_cancel; +}; + struct nw_socket { struct aws_allocator *allocator; struct aws_ref_count ref_count; @@ -344,6 +365,124 @@ static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_ } } +static void s_process_readable_task(struct aws_task *task, void *arg, enum aws_task_status status) { + // TODO: WAHT IF THE TASK IS CANCELED??? + + (void)status; + struct nw_socket_readable_args *args = arg; + + struct nw_socket *nw_socket = args->socket->impl; + nw_socket->on_readable(args->socket, args->error_code, nw_socket->on_readable_user_data); + + aws_mem_release(args->allocator, task); + aws_mem_release(args->allocator, args); +} + +static void s_schedule_on_readable(struct aws_socket *socket, int error_code) { + struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); + ; + struct nw_socket_readable_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args)); + + args->socket = socket; + args->allocator = socket->allocator; + args->error_code = error_code; + + aws_task_init(task, s_process_readable_task, args, "readableTask"); + + aws_event_loop_schedule_task_now(socket->event_loop, task); +} + +static void s_process_connection_success_task(struct aws_task *task, void *arg, enum aws_task_status status) { + // TODO: WAHT IF THE TASK IS CANCELED??? + + (void)status; + struct nw_socket_readable_args *args = arg; + + struct nw_socket *nw_socket = args->socket->impl; + nw_socket->on_connection_result_fn(args->socket, args->error_code, nw_socket->connect_accept_user_data); + + aws_mem_release(args->allocator, task); + aws_mem_release(args->allocator, args); +} + +static void s_schedule_on_connection_success(struct aws_socket *socket, int error_code) { + + struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); + ; + struct nw_socket_readable_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args)); + + args->socket = socket; + args->allocator = socket->allocator; + args->error_code = error_code; + + aws_task_init(task, s_process_connection_success_task, args, "connectionSuccessTask"); + aws_event_loop_schedule_task_now(socket->event_loop, task); +} + +static void s_process_cancel_task(struct aws_task *task, void *arg, enum aws_task_status status) { + // TODO: WAHT IF THE TASK IS CANCELED??? + + (void)status; + struct nw_socket_cancel_task_args *args = arg; + + if (status == AWS_TASK_STATUS_RUN_READY) + aws_event_loop_cancel_task(args->socket->event_loop, args->task_to_cancel); + + aws_mem_release(args->allocator, task); + aws_mem_release(args->allocator, args); +} + +// As cancel task has to run on the same thread & we dont have control on dispatch queue thread, +// we always schedule the cancel task on event loop +static void s_schedule_cancel_task(struct aws_socket *socket, struct aws_task *task_to_cancel) { + + struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); + ; + struct nw_socket_cancel_task_args *args = + aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_cancel_task_args)); + + args->socket = socket; + args->allocator = socket->allocator; + args->task_to_cancel = task_to_cancel; + + aws_task_init(task, s_process_cancel_task, args, "cancelTaskTask"); + aws_event_loop_schedule_task_now(socket->event_loop, task); +} + +static void s_process_write_task(struct aws_task *task, void *arg, enum aws_task_status status) { + // TODO: WAHT IF THE TASK IS CANCELED??? + + (void)status; + struct nw_socket_written_args *args = arg; + + args->written_fn(args->socket, args->error_code, args->bytes_written, args->user_data); + + aws_mem_release(args->allocator, task); + aws_mem_release(args->allocator, args); +} + +static void s_schedule_write_fn( + struct aws_socket *socket, + int error_code, + size_t bytes_written, + void *user_data, + aws_socket_on_write_completed_fn *written_fn) { + + struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); + ; + struct nw_socket_written_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_written_args)); + + args->socket = socket; + args->allocator = socket->allocator; + args->error_code = error_code; + args->written_fn = written_fn; + args->user_data = user_data; + args->bytes_written = bytes_written; + + aws_task_init(task, s_process_write_task, args, "writtenTask"); + aws_event_loop_schedule_task_now(socket->event_loop, task); +} + static int s_socket_connect_fn( struct aws_socket *socket, const struct aws_socket_endpoint *remote_endpoint, @@ -461,9 +600,6 @@ static int s_socket_connect_fn( nw_socket->on_connection_result_fn = on_connection_result; nw_socket->connect_accept_user_data = user_data; - - struct nw_socket *socket_impl = socket->impl; - nw_socket->timeout_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_timeout_args)); nw_socket->timeout_args->socket = socket; @@ -475,8 +611,6 @@ static int s_socket_connect_fn( nw_socket->timeout_args, "NWSocketConnectionTimeoutTask"); - nw_connection_t handle = socket->io_handle.data.handle; - /* set a handler for socket state changes. This is where we find out if the connection timed out, was successful, * was disconnected etc .... */ nw_connection_set_state_changed_handler( @@ -510,12 +644,13 @@ static int s_socket_connect_fn( socket->local_endpoint.address, port); // Cancel the connection timeout task - aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task); + s_schedule_cancel_task(socket, &nw_socket->timeout_args->task); socket->state = CONNECTED_WRITE | CONNECTED_READ; nw_socket->setup_run = true; aws_ref_count_acquire(&nw_socket->ref_count); - on_connection_result(socket, AWS_OP_SUCCESS, user_data); + s_schedule_on_connection_success(socket, AWS_OP_SUCCESS); aws_ref_count_release(&nw_socket->ref_count); + } else if (error) { /* any error, including if closed remotely in error */ int error_code = nw_error_get_error_code(error); @@ -525,10 +660,8 @@ static int s_socket_connect_fn( (void *)socket, socket->io_handle.data.handle, error_code); - // Cancel the connection timeout task - aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task); - + s_schedule_cancel_task(socket, &nw_socket->timeout_args->task); /* we don't let this thing do DNS or TLS. Everything had better be a posix error. */ // AWS_ASSERT(nw_error_get_error_domain(error) == nw_error_domain_posix); // DEBUG WIP we do in fact allow this to do TLS @@ -538,18 +671,19 @@ static int s_socket_connect_fn( socket->state = ERROR; aws_ref_count_acquire(&nw_socket->ref_count); if (!nw_socket->setup_run) { - on_connection_result(socket, error_code, user_data); + s_schedule_on_connection_success(socket, error_code); nw_socket->setup_run = true; } else if (socket->readable_fn) { - socket->readable_fn(socket, nw_socket->last_error, socket->readable_user_data); + s_schedule_on_readable(socket, nw_socket->last_error); } + aws_ref_count_release(&nw_socket->ref_count); } else if (state == nw_connection_state_cancelled || state == nw_connection_state_failed) { /* this should only hit when the socket was closed by not us. Note, * we uninstall this handler right before calling close on the socket so this shouldn't * get hit unless it was triggered remotely */ // Cancel the connection timeout task - aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task); + s_schedule_cancel_task(socket, &nw_socket->timeout_args->task); AWS_LOGF_DEBUG( AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed remotely.", @@ -559,10 +693,10 @@ static int s_socket_connect_fn( aws_ref_count_acquire(&nw_socket->ref_count); aws_raise_error(AWS_IO_SOCKET_CLOSED); if (!nw_socket->setup_run) { - on_connection_result(socket, AWS_IO_SOCKET_CLOSED, user_data); + s_schedule_on_connection_success(socket, AWS_IO_SOCKET_CLOSED); nw_socket->setup_run = true; } else if (socket->readable_fn) { - socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data); + s_schedule_on_readable(socket, AWS_IO_SOCKET_CLOSED); } aws_ref_count_release(&nw_socket->ref_count); } else if (state == nw_connection_state_waiting) { @@ -907,7 +1041,8 @@ static void s_schedule_next_read(struct aws_socket *socket) { "id=%p handle=%p: cannot read to because it is not connected", (void *)socket, socket->io_handle.data.handle); - return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + return; } // Acquire nw_socket after we call connection receive, and released it when handler is called. @@ -940,7 +1075,8 @@ static void s_schedule_next_read(struct aws_socket *socket) { (void *)socket, socket->io_handle.data.handle, (int)dispatch_data_get_size(data)); - nw_socket->on_readable(socket, AWS_ERROR_SUCCESS, nw_socket->on_readable_user_data); + + s_schedule_on_readable(socket, AWS_ERROR_SUCCESS); } if (!is_complete) { s_schedule_next_read(socket); @@ -956,7 +1092,7 @@ static void s_schedule_next_read(struct aws_socket *socket) { socket->io_handle.data.handle, error_code); - nw_socket->on_readable(socket, error_code, nw_socket->on_readable_user_data); + s_schedule_on_readable(socket, error_code); } aws_ref_count_release(&nw_socket->ref_count); }); @@ -1080,44 +1216,59 @@ static int s_socket_write_fn( struct nw_socket *nw_socket = socket->impl; aws_ref_count_acquire(&nw_socket->ref_count); - nw_connection_t handle = socket->io_handle.data.handle; AWS_ASSERT(written_fn); - dispatch_data_t data = dispatch_data_create(cursor->ptr, cursor->len, NULL, DISPATCH_DATA_DESTRUCTOR_FREE); - nw_connection_send(handle, data, _nw_content_context_default_message, true, ^(nw_error_t error) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p handle=%p: processing write requests, called from aws_socket_write", - (void *)socket, - handle); - - if (!nw_socket->socket_open) { - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, handle); - written_fn(socket, 0, 0, user_data); - goto nw_socket_release; - } - - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, "id=%p handle=%p: DEBUG:: callback writing message: %p", (void *)socket, handle, user_data); - int error_code = !error || nw_error_get_error_code(error) == 0 - ? AWS_OP_SUCCESS - : s_determine_socket_error(nw_error_get_error_code(error)); - - if (error_code) { - nw_socket->last_error = error_code; - aws_raise_error(error_code); + dispatch_data_t data = dispatch_data_create(cursor->ptr, cursor->len, NULL, DISPATCH_DATA_DESTRUCTOR_DEFAULT); + nw_connection_send( + socket->io_handle.data.handle, data, _nw_content_context_default_message, true, ^(nw_error_t error) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: processing write requests, called from aws_socket_write", + (void *)socket, + socket->io_handle.data.handle); + + if (!nw_socket->socket_open) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle); + // As the socket is not open, we no longer have access to the event loop to schedule tasks + // directly execute the written callback instead of scheduling a task. + written_fn(socket, 0, 0, user_data); + goto nw_socket_release; + } + AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, "id=%p handle=%p: error during write %d", (void *)socket, handle, error_code); - } - - size_t written_size = dispatch_data_get_size(data); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p handle=%p: send written size %d", (void *)socket, handle, (int)written_size); - written_fn(socket, error_code, !error_code ? written_size : 0, user_data); - nw_socket_release: - aws_ref_count_release(&nw_socket->ref_count); - }); + AWS_LS_IO_SOCKET, + "id=%p handle=%p: DEBUG:: callback writing message: %p", + (void *)socket, + socket->io_handle.data.handle, + user_data); + int error_code = !error || nw_error_get_error_code(error) == 0 + ? AWS_OP_SUCCESS + : s_determine_socket_error(nw_error_get_error_code(error)); + + if (error_code) { + nw_socket->last_error = error_code; + aws_raise_error(error_code); + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: error during write %d", + (void *)socket, + socket->io_handle.data.handle, + error_code); + } + + size_t written_size = dispatch_data_get_size(data); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: send written size %d", + (void *)socket, + socket->io_handle.data.handle, + (int)written_size); + s_schedule_write_fn(socket, error_code, !error_code ? written_size : 0, user_data, written_fn); + nw_socket_release: + aws_ref_count_release(&nw_socket->ref_count); + }); return AWS_OP_SUCCESS; }