diff --git a/include/aws/io/private/dispatch_queue.h b/include/aws/io/private/dispatch_queue.h new file mode 100644 index 000000000..4c7cf9187 --- /dev/null +++ b/include/aws/io/private/dispatch_queue.h @@ -0,0 +1,52 @@ +#ifndef AWS_IO_PRIVATE_DISPATCH_QUEUE_H +#define AWS_IO_PRIVATE_DISPATCH_QUEUE_H +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include + +struct secure_transport_ctx { + struct aws_tls_ctx ctx; + CFAllocatorRef wrapped_allocator; + CFArrayRef certs; + SecIdentityRef secitem_identity; + CFArrayRef ca_cert; + enum aws_tls_versions minimum_version; + struct aws_string *alpn_list; + bool verify_peer; +}; + +struct dispatch_scheduling_state { + // Let's us skip processing an iteration task if one is already in the middle + // of executing + bool is_executing_iteration; + + // List in sorted order by timestamp + // + // When we go to schedule a new iteration, we check here first to see + // if our scheduling attempt is redundant + struct aws_linked_list scheduled_services; +}; + +struct dispatch_loop { + struct aws_allocator *allocator; + struct aws_ref_count ref_count; + dispatch_queue_t dispatch_queue; + struct aws_task_scheduler scheduler; + struct aws_linked_list local_cross_thread_tasks; + + struct { + struct dispatch_scheduling_state scheduling_state; + struct aws_linked_list cross_thread_tasks; + struct aws_mutex lock; + bool suspended; + } synced_data; + + bool wakeup_schedule_needed; +}; + +#endif /* #ifndef AWS_IO_PRIVATE_DISPATCH_QUEUE_H */ \ No newline at end of file diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 478634e43..cfe2f112a 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -16,6 +16,7 @@ #include #include #include +#include static void s_destroy(struct aws_event_loop *event_loop); static int s_run(struct aws_event_loop *event_loop); @@ -46,18 +47,6 @@ static struct aws_event_loop_vtable s_vtable = { .is_on_callers_thread = s_is_on_callers_thread, }; -struct dispatch_scheduling_state { - // Let's us skip processing an iteration task if one is already in the middle - // of executing - bool is_executing_iteration; - - // List in sorted order by timestamp - // - // When we go to schedule a new iteration, we check here first to see - // if our scheduling attempt is redundant - struct aws_linked_list scheduled_services; -}; - struct scheduled_service_entry { struct aws_allocator *allocator; uint64_t timestamp; @@ -65,23 +54,6 @@ struct scheduled_service_entry { struct aws_event_loop *loop; // might eventually need to be ref-counted for cleanup? }; -struct dispatch_loop { - struct aws_allocator *allocator; - struct aws_ref_count ref_count; - dispatch_queue_t dispatch_queue; - struct aws_task_scheduler scheduler; - struct aws_linked_list local_cross_thread_tasks; - - struct { - struct dispatch_scheduling_state scheduling_state; - struct aws_linked_list cross_thread_tasks; - struct aws_mutex lock; - bool suspended; - } synced_data; - - bool wakeup_schedule_needed; -}; - struct scheduled_service_entry *scheduled_service_entry_new(struct aws_event_loop *loop, uint64_t timestamp) { struct scheduled_service_entry *entry = aws_mem_calloc(loop->alloc, 1, sizeof(struct scheduled_service_entry)); diff --git a/source/darwin/nw_socket.c b/source/darwin/nw_socket.c index 4207f759e..d9f5993a5 100644 --- a/source/darwin/nw_socket.c +++ b/source/darwin/nw_socket.c @@ -10,6 +10,8 @@ #include #include +#include +#include #include #include @@ -73,7 +75,7 @@ enum socket_state { CLOSED, }; -struct nw_socket_connect_args { +struct nw_socket_timeout_args { struct aws_task task; struct aws_allocator *allocator; struct aws_socket *socket; @@ -89,10 +91,10 @@ struct nw_socket { aws_socket_on_readable_fn *on_readable; void *on_readable_user_data; bool setup_run; - bool setup_closing; + bool socket_open; bool read_queued; bool is_listener; - struct nw_socket_connect_args *connect_args; + struct nw_socket_timeout_args *timeout_args; aws_socket_on_connection_result_fn *on_connection_result_fn; void *connect_accept_user_data; }; @@ -269,7 +271,7 @@ static void s_socket_impl_destroy(void *sock_ptr) { nw_socket->nw_connection = NULL; } - aws_mem_release(nw_socket->allocator, nw_socket->connect_args); + aws_mem_release(nw_socket->allocator, nw_socket->timeout_args); aws_mem_release(nw_socket->allocator, nw_socket); nw_socket = NULL; } @@ -316,27 +318,27 @@ static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_ // We will clean up the task and args on socket destory. return; } - struct nw_socket_connect_args *socket_args = args; + struct nw_socket_timeout_args *timeout_args = args; AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task); - /* successful connection will have nulled out connect_args->socket */ - if (socket_args->socket) { + /* successful connection will have nulled out timeout_args->socket */ + if (timeout_args->socket) { AWS_LOGF_ERROR( AWS_LS_IO_SOCKET, "id=%p handle=%p: timed out, shutting down.", - (void *)socket_args->socket, - socket_args->socket->io_handle.data.handle); + (void *)timeout_args->socket, + timeout_args->socket->io_handle.data.handle); - socket_args->socket->state = TIMEDOUT; + timeout_args->socket->state = TIMEDOUT; int error_code = AWS_IO_SOCKET_TIMEOUT; - // socket_args->socket->event_loop = NULL; - struct nw_socket *socket_impl = socket_args->socket->impl; + // timeout_args->socket->event_loop = NULL; + struct nw_socket *socket_impl = timeout_args->socket->impl; aws_raise_error(error_code); - struct aws_socket *socket = socket_args->socket; - /*socket close sets socket_args->socket to NULL and - * socket_impl->connect_args to NULL. */ + struct aws_socket *socket = timeout_args->socket; + /*socket close sets timeout_args->socket to NULL and + * socket_impl->timeout_args to NULL. */ aws_socket_close(socket); socket_impl->on_connection_result_fn(socket, error_code, socket_impl->connect_accept_user_data); } @@ -462,111 +464,118 @@ static int s_socket_connect_fn( struct nw_socket *socket_impl = socket->impl; - nw_socket->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_connect_args)); - if (!nw_socket->connect_args) { - return AWS_OP_ERR; - } + nw_socket->timeout_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_timeout_args)); - nw_socket->connect_args->socket = socket; - nw_socket->connect_args->allocator = socket->allocator; + nw_socket->timeout_args->socket = socket; + nw_socket->timeout_args->allocator = socket->allocator; aws_task_init( - &nw_socket->connect_args->task, + &nw_socket->timeout_args->task, s_handle_socket_timeout, - nw_socket->connect_args, + nw_socket->timeout_args, "NWSocketConnectionTimeoutTask"); nw_connection_t handle = socket->io_handle.data.handle; /* set a handler for socket state changes. This is where we find out if the connection timed out, was successful, * was disconnected etc .... */ - nw_connection_set_state_changed_handler(handle, ^(nw_connection_state_t state, nw_error_t error) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p handle=%p: connection state changed hanlder, state: %d", - (void *)socket, - handle, - state); - /* we're connected! */ - if (state == nw_connection_state_ready) { - AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p handle=%p: connection success", (void *)socket, handle); - - nw_path_t path = nw_connection_copy_current_path(handle); - nw_endpoint_t local_endpoint = nw_path_copy_effective_local_endpoint(path); - nw_release(path); - const char *hostname = nw_endpoint_get_hostname(local_endpoint); - uint16_t port = nw_endpoint_get_port(local_endpoint); - - size_t hostname_len = strlen(hostname); - size_t buffer_size = AWS_ARRAY_SIZE(socket->local_endpoint.address); - size_t to_copy = aws_min_size(hostname_len, buffer_size); - memcpy(socket->local_endpoint.address, hostname, to_copy); - socket->local_endpoint.port = port; - nw_release(local_endpoint); - - // Cancel the connection timeout task - aws_event_loop_cancel_task(event_loop, &nw_socket->connect_args->task); - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p handle=%p: local endpoint %s:%d", - (void *)socket, - handle, - socket->local_endpoint.address, - port); - - socket->state = CONNECTED_WRITE | CONNECTED_READ; - aws_ref_count_acquire(&nw_socket->ref_count); - on_connection_result(socket, AWS_OP_SUCCESS, user_data); - aws_ref_count_release(&nw_socket->ref_count); - nw_socket->setup_run = true; - } else if (error) { - /* any error, including if closed remotely in error */ - int error_code = nw_error_get_error_code(error); - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p handle=%p: connection error %d", - (void *)socket, - socket->io_handle.data.handle, - error_code); - // Cancel the connection timeout task - aws_event_loop_cancel_task(event_loop, &nw_socket->connect_args->task); - /* we don't let this thing do DNS or TLS. Everything had better be a posix error. */ - AWS_ASSERT(nw_error_get_error_domain(error) == nw_error_domain_posix); - error_code = s_determine_socket_error(error_code); - nw_socket->last_error = error_code; - aws_raise_error(error_code); - socket->state = ERROR; - aws_ref_count_acquire(&nw_socket->ref_count); - if (!nw_socket->setup_run) { - on_connection_result(socket, error_code, user_data); - nw_socket->setup_run = true; - } else if (socket->readable_fn) { - socket->readable_fn(socket, nw_socket->last_error, socket->readable_user_data); - } - - aws_ref_count_release(&nw_socket->ref_count); - } else if (state == nw_connection_state_cancelled || state == nw_connection_state_failed) { - /* this should only hit when the socket was closed by not us. Note, - * we uninstall this handler right before calling close on the socket so this shouldn't - * get hit unless it was triggered remotely */ - // Cancel the connection timeout task - aws_event_loop_cancel_task(event_loop, &nw_socket->connect_args->task); - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle); - socket->state = CLOSED; - aws_ref_count_acquire(&nw_socket->ref_count); - aws_raise_error(AWS_IO_SOCKET_CLOSED); - if (!nw_socket->setup_run) { - on_connection_result(socket, AWS_IO_SOCKET_CLOSED, user_data); - nw_socket->setup_run = true; - } else if (socket->readable_fn) { - socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data); - } - aws_ref_count_release(&nw_socket->ref_count); - } - - }); + nw_connection_set_state_changed_handler( + socket->io_handle.data.handle, ^(nw_connection_state_t state, nw_error_t error) { + /* we're connected! */ + if (state == nw_connection_state_ready) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: connection success", + (void *)socket, + socket->io_handle.data.handle); + nw_socket->socket_open = true; + nw_path_t path = nw_connection_copy_current_path(socket->io_handle.data.handle); + nw_endpoint_t local_endpoint = nw_path_copy_effective_local_endpoint(path); + nw_release(path); + const char *hostname = nw_endpoint_get_hostname(local_endpoint); + uint16_t port = nw_endpoint_get_port(local_endpoint); + + size_t hostname_len = strlen(hostname); + size_t buffer_size = AWS_ARRAY_SIZE(socket->local_endpoint.address); + size_t to_copy = aws_min_size(hostname_len, buffer_size); + memcpy(socket->local_endpoint.address, hostname, to_copy); + socket->local_endpoint.port = port; + nw_release(local_endpoint); + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: local endpoint %s:%d", + (void *)socket, + socket->io_handle.data.handle, + socket->local_endpoint.address, + port); + // Cancel the connection timeout task + aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task); + socket->state = CONNECTED_WRITE | CONNECTED_READ; + nw_socket->setup_run = true; + aws_ref_count_acquire(&nw_socket->ref_count); + on_connection_result(socket, AWS_OP_SUCCESS, user_data); + aws_ref_count_release(&nw_socket->ref_count); + } else if (error) { + /* any error, including if closed remotely in error */ + int error_code = nw_error_get_error_code(error); + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: connection error %d", + (void *)socket, + socket->io_handle.data.handle, + error_code); + + // Cancel the connection timeout task + aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task); + + /* we don't let this thing do DNS or TLS. Everything had better be a posix error. */ + // AWS_ASSERT(nw_error_get_error_domain(error) == nw_error_domain_posix); + // DEBUG WIP we do in fact allow this to do TLS + error_code = s_determine_socket_error(error_code); + nw_socket->last_error = error_code; + aws_raise_error(error_code); + socket->state = ERROR; + aws_ref_count_acquire(&nw_socket->ref_count); + if (!nw_socket->setup_run) { + on_connection_result(socket, error_code, user_data); + nw_socket->setup_run = true; + } else if (socket->readable_fn) { + socket->readable_fn(socket, nw_socket->last_error, socket->readable_user_data); + } + aws_ref_count_release(&nw_socket->ref_count); + } else if (state == nw_connection_state_cancelled || state == nw_connection_state_failed) { + /* this should only hit when the socket was closed by not us. Note, + * we uninstall this handler right before calling close on the socket so this shouldn't + * get hit unless it was triggered remotely */ + // Cancel the connection timeout task + aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task); + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: socket closed remotely.", + (void *)socket, socket->io_handle.data.handle); + socket->state = CLOSED; + aws_ref_count_acquire(&nw_socket->ref_count); + aws_raise_error(AWS_IO_SOCKET_CLOSED); + if (!nw_socket->setup_run) { + on_connection_result(socket, AWS_IO_SOCKET_CLOSED, user_data); + nw_socket->setup_run = true; + } else if (socket->readable_fn) { + socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data); + } + aws_ref_count_release(&nw_socket->ref_count); + } else if (state == nw_connection_state_waiting) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: socket connection is waiting for a usable network before re-attempting.", + (void *)socket, socket->io_handle.data.handle); + } else if (state == nw_connection_state_preparing) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: socket connection is in the process of establishing.", + (void *)socket, socket->io_handle.data.handle); + } + }); nw_connection_start(socket->io_handle.data.handle); nw_retain(socket->io_handle.data.handle); @@ -590,8 +599,8 @@ static int s_socket_connect_fn( (void *)socket, socket->io_handle.data.handle, (unsigned long long)timeout); - nw_socket->connect_args->task.timestamp = timeout; - aws_event_loop_schedule_task_future(event_loop, &nw_socket->connect_args->task, timeout); + nw_socket->timeout_args->task.timestamp = timeout; + aws_event_loop_schedule_task_future(event_loop, &nw_socket->timeout_args->task, timeout); return AWS_OP_SUCCESS; } @@ -813,11 +822,14 @@ static int s_socket_close_fn(struct aws_socket *socket) { nw_listener_cancel(socket->io_handle.data.handle); } else { + if (nw_socket->socket_open) { + nw_connection_cancel(socket->io_handle.data.handle); + } + /* Setting to NULL removes previously set handler from nw_connection_t */ nw_connection_set_state_changed_handler(socket->io_handle.data.handle, NULL); - nw_connection_cancel(socket->io_handle.data.handle); } - nw_socket->setup_closing = true; + nw_socket->socket_open = false; return AWS_OP_SUCCESS; } @@ -895,7 +907,7 @@ static void s_schedule_next_read(struct aws_socket *socket) { return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); } - // Acquire nw_socket after we call connection receive, and released it when handler is called. + // Acquire nw_socket after we call connection receive, and released it when handler is called. aws_ref_count_acquire(&nw_socket->ref_count); /* read and let me know when you've done it. */ @@ -908,7 +920,7 @@ static void s_schedule_next_read(struct aws_socket *socket) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p handle=%p: read cb invoked", (void *)socket, socket->io_handle.data.handle); - if (nw_socket->setup_closing) { + if (!nw_socket->socket_open) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle); aws_raise_error(AWS_IO_SOCKET_CLOSED); @@ -1062,7 +1074,7 @@ static int s_socket_write_fn( socket->io_handle.data.handle); return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); } - + struct nw_socket *nw_socket = socket->impl; aws_ref_count_acquire(&nw_socket->ref_count); nw_connection_t handle = socket->io_handle.data.handle; @@ -1077,8 +1089,8 @@ static int s_socket_write_fn( "id=%p handle=%p: processing write requests, called from aws_socket_write", (void *)socket, handle); - - if (nw_socket->setup_closing) { + + if (!nw_socket->socket_open) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", @@ -1087,7 +1099,7 @@ static int s_socket_write_fn( written_fn(socket, 0, 0, user_data); goto nw_socket_release; } - + AWS_LOGF_ERROR( AWS_LS_IO_SOCKET, "id=%p handle=%p: DEBUG:: callback writing message: %p", diff --git a/source/darwin/secure_transport_tls_channel_handler.c b/source/darwin/secure_transport_tls_channel_handler.c index a06d140ee..b8fa08b48 100644 --- a/source/darwin/secure_transport_tls_channel_handler.c +++ b/source/darwin/secure_transport_tls_channel_handler.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include