From 42ea938cdd6b503d04602f169414601f93ed7daf Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Tue, 1 Oct 2024 13:37:18 -0700 Subject: [PATCH] fix nw socket listener --- CMakeLists.txt | 7 +- source/darwin/nw_socket.c | 175 +++++++++++++++++++++++++++++++++----- 2 files changed, 159 insertions(+), 23 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ac03eeed6..b9ec7f9bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -117,7 +117,9 @@ elseif (APPLE) file(GLOB AWS_IO_OS_SRC "source/bsd/*.c" - "source/posix/*.c" + "source/posix/pipe.c" + "source/posix/host_resolver.c" + "source/posix/shared_library.c" "source/darwin/darwin_pki_utils.c" "source/darwin/secure_transport_tls_channel_handler.c" ) @@ -144,6 +146,9 @@ elseif (APPLE) ) list(APPEND AWS_IO_OS_SRC ${AWS_IO_DISPATCH_QUEUE_SRC}) else () + file(GLOB AWS_IO_DISPATCH_QUEUE_SRC + "source/posix/socket.c" + ) set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE") endif() diff --git a/source/darwin/nw_socket.c b/source/darwin/nw_socket.c index bbbb71f99..b2af18366 100644 --- a/source/darwin/nw_socket.c +++ b/source/darwin/nw_socket.c @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -75,6 +76,14 @@ enum socket_state { CLOSED, }; +struct nw_socket_listener_args { + int error_code; + struct aws_allocator *allocator; + struct aws_socket *socket; + struct aws_socket *new_socket; + void *user_data; +}; + struct nw_socket_timeout_args { struct aws_task task; struct aws_allocator *allocator; @@ -112,7 +121,7 @@ struct nw_socket { aws_socket_on_readable_fn *on_readable; void *on_readable_user_data; bool setup_run; - bool socket_open; + bool currently_connected; // If the io port is connected. Similar to posix_socket->currently_subscribed. bool read_queued; bool is_listener; struct nw_socket_timeout_args *timeout_args; @@ -162,11 +171,11 @@ static int s_setup_socket_params(struct nw_socket *nw_socket, const struct aws_s }); } else if (options->domain == AWS_SOCKET_LOCAL) { nw_socket->socket_options_to_params = - nw_parameters_create_custom_ip(AF_LOCAL, NW_PARAMETERS_DEFAULT_CONFIGURATION); + nw_parameters_create_secure_tcp(NW_PARAMETERS_DISABLE_PROTOCOL, NW_PARAMETERS_DEFAULT_CONFIGURATION); } } else if (options->type == AWS_SOCKET_DGRAM) { nw_socket->socket_options_to_params = - nw_parameters_create_secure_udp(NW_PARAMETERS_DISABLE_PROTOCOL, NW_PARAMETERS_DEFAULT_CONFIGURATION); + nw_parameters_create_secure_udp(NW_PARAMETERS_DEFAULT_CONFIGURATION, NW_PARAMETERS_DEFAULT_CONFIGURATION); } if (!nw_socket->socket_options_to_params) { @@ -372,7 +381,8 @@ static void s_process_readable_task(struct aws_task *task, void *arg, enum aws_t struct nw_socket_readable_args *args = arg; struct nw_socket *nw_socket = args->socket->impl; - nw_socket->on_readable(args->socket, args->error_code, nw_socket->on_readable_user_data); + if (nw_socket->on_readable) + nw_socket->on_readable(args->socket, args->error_code, nw_socket->on_readable_user_data); aws_mem_release(args->allocator, task); aws_mem_release(args->allocator, args); @@ -399,7 +409,8 @@ static void s_process_connection_success_task(struct aws_task *task, void *arg, struct nw_socket_readable_args *args = arg; struct nw_socket *nw_socket = args->socket->impl; - nw_socket->on_connection_result_fn(args->socket, args->error_code, nw_socket->connect_accept_user_data); + if (nw_socket->on_connection_result_fn) + nw_socket->on_connection_result_fn(args->socket, args->error_code, nw_socket->connect_accept_user_data); aws_mem_release(args->allocator, task); aws_mem_release(args->allocator, args); @@ -419,6 +430,40 @@ static void s_schedule_on_connection_success(struct aws_socket *socket, int erro aws_event_loop_schedule_task_now(socket->event_loop, task); } +static void s_process_listener_success_task(struct aws_task *task, void *args, enum aws_task_status status) { + // TODO: WAHT IF THE TASK IS CANCELED??? + + (void)status; + struct nw_socket_listener_args *listener_args = args; + + if (listener_args->socket->accept_result_fn) + listener_args->socket->accept_result_fn( + listener_args->socket, listener_args->error_code, listener_args->new_socket, listener_args->user_data); + + aws_mem_release(listener_args->allocator, task); + aws_mem_release(listener_args->allocator, listener_args); +} + +static void s_schedule_on_listener_success( + struct aws_socket *socket, + int error_code, + struct aws_socket *new_socket, + void *user_data) { + + struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); + ; + struct nw_socket_listener_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args)); + + args->socket = socket; + args->allocator = socket->allocator; + args->error_code = error_code; + args->new_socket = new_socket; + args->user_data = user_data; + + aws_task_init(task, s_process_listener_success_task, args, "listenerSuccessTask"); + aws_event_loop_schedule_task_now(socket->event_loop, task); +} + static void s_process_cancel_task(struct aws_task *task, void *arg, enum aws_task_status status) { // TODO: WAHT IF THE TASK IS CANCELED??? @@ -455,7 +500,8 @@ static void s_process_write_task(struct aws_task *task, void *arg, enum aws_task (void)status; struct nw_socket_written_args *args = arg; - args->written_fn(args->socket, args->error_code, args->bytes_written, args->user_data); + if (args->written_fn) + args->written_fn(args->socket, args->error_code, args->bytes_written, args->user_data); aws_mem_release(args->allocator, task); aws_mem_release(args->allocator, args); @@ -622,18 +668,20 @@ static int s_socket_connect_fn( "id=%p handle=%p: connection success", (void *)socket, socket->io_handle.data.handle); - nw_socket->socket_open = true; + nw_socket->currently_connected = true; nw_path_t path = nw_connection_copy_current_path(socket->io_handle.data.handle); nw_endpoint_t local_endpoint = nw_path_copy_effective_local_endpoint(path); nw_release(path); const char *hostname = nw_endpoint_get_hostname(local_endpoint); uint16_t port = nw_endpoint_get_port(local_endpoint); - size_t hostname_len = strlen(hostname); - size_t buffer_size = AWS_ARRAY_SIZE(socket->local_endpoint.address); - size_t to_copy = aws_min_size(hostname_len, buffer_size); - memcpy(socket->local_endpoint.address, hostname, to_copy); - socket->local_endpoint.port = port; + if (hostname != NULL) { + size_t hostname_len = strlen(hostname); + size_t buffer_size = AWS_ARRAY_SIZE(socket->local_endpoint.address); + size_t to_copy = aws_min_size(hostname_len, buffer_size); + memcpy(socket->local_endpoint.address, hostname, to_copy); + socket->local_endpoint.port = port; + } nw_release(local_endpoint); AWS_LOGF_DEBUG( @@ -886,6 +934,45 @@ static int s_socket_start_accept_fn( socket->connect_accept_user_data = user_data; __block struct aws_allocator *allocator = socket->allocator; + nw_listener_set_state_changed_handler( + socket->io_handle.data.handle, ^(nw_listener_state_t state, nw_error_t error) { + errno = error ? nw_error_get_error_code(error) : 0; + if (state == nw_listener_state_waiting) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: lisnter on port waiting ", + (void *)socket, + socket->io_handle.data.handle); + + } else if (state == nw_listener_state_failed) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: lisnter on port failed ", + (void *)socket, + socket->io_handle.data.handle); + /* any error, including if closed remotely in error */ + int error_code = nw_error_get_error_code(error); + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: connection error %d", + (void *)socket, + socket->io_handle.data.handle, + error_code); + } else if (state == nw_listener_state_ready) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: lisnter on port ready ", + (void *)socket, + socket->io_handle.data.handle); + } else if (state == nw_listener_state_cancelled) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: lisnter on port cancelled ", + (void *)socket, + socket->io_handle.data.handle); + } + }); + nw_listener_set_new_connection_handler(socket->io_handle.data.handle, ^(nw_connection_t connection) { /* invoked upon an incoming connection. In BSD/Posix land this is the result of an * accept() call. */ @@ -907,13 +994,22 @@ static int s_socket_start_accept_fn( const char *hostname = nw_endpoint_get_hostname(endpoint); uint16_t port = nw_endpoint_get_port(endpoint); - size_t hostname_len = strlen(hostname); - size_t buffer_size = AWS_ARRAY_SIZE(new_socket->remote_endpoint.address); - size_t to_copy = aws_min_size(hostname_len, buffer_size); - memcpy(new_socket->remote_endpoint.address, hostname, to_copy); - new_socket->remote_endpoint.port = port; + if (hostname != NULL) { + size_t hostname_len = strlen(hostname); + size_t buffer_size = AWS_ARRAY_SIZE(new_socket->remote_endpoint.address); + size_t to_copy = aws_min_size(hostname_len, buffer_size); + memcpy(new_socket->remote_endpoint.address, hostname, to_copy); + new_socket->remote_endpoint.port = port; + } nw_release(endpoint); + // Setup socket state to start read/write operations. + new_socket->state = CONNECTED_READ | CONNECTED_WRITE; + struct nw_socket *new_nw_socket = new_socket->impl; + new_nw_socket->nw_connection = connection; + new_nw_socket->setup_run = true; + new_nw_socket->currently_connected = true; + AWS_LOGF_INFO( AWS_LS_IO_SOCKET, "id=%p handle=%p: connected to %s:%d, incoming handle %p", @@ -922,7 +1018,7 @@ static int s_socket_start_accept_fn( new_socket->remote_endpoint.address, new_socket->remote_endpoint.port, new_socket->io_handle.data.handle); - on_accept_result(socket, AWS_OP_SUCCESS, new_socket, user_data); + s_schedule_on_listener_success(socket, AWS_OP_SUCCESS, new_socket, user_data); }); nw_listener_start(socket->io_handle.data.handle); return AWS_OP_SUCCESS; @@ -959,14 +1055,14 @@ static int s_socket_close_fn(struct aws_socket *socket) { nw_listener_cancel(socket->io_handle.data.handle); } else { - if (nw_socket->socket_open) { + if (nw_socket->currently_connected) { nw_connection_cancel(socket->io_handle.data.handle); } /* Setting to NULL removes previously set handler from nw_connection_t */ nw_connection_set_state_changed_handler(socket->io_handle.data.handle, NULL); } - nw_socket->socket_open = false; + nw_socket->currently_connected = false; return AWS_OP_SUCCESS; } @@ -1058,7 +1154,7 @@ static void s_schedule_next_read(struct aws_socket *socket) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p handle=%p: read cb invoked", (void *)socket, socket->io_handle.data.handle); - if (!nw_socket->socket_open) { + if (!nw_socket->currently_connected) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle); aws_raise_error(AWS_IO_SOCKET_CLOSED); @@ -1228,7 +1324,7 @@ static int s_socket_write_fn( (void *)socket, socket->io_handle.data.handle); - if (!nw_socket->socket_open) { + if (!nw_socket->currently_connected) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle); // As the socket is not open, we no longer have access to the event loop to schedule tasks @@ -1288,3 +1384,38 @@ static bool s_socket_is_open_fn(struct aws_socket *socket) { return nw_socket->last_error == AWS_OP_SUCCESS; } + +void aws_socket_endpoint_init_local_address_for_test(struct aws_socket_endpoint *endpoint) { + struct aws_uuid uuid; + AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS); + char uuid_str[AWS_UUID_STR_LEN] = {0}; + struct aws_byte_buf uuid_buf = aws_byte_buf_from_empty_array(uuid_str, sizeof(uuid_str)); + AWS_FATAL_ASSERT(aws_uuid_to_str(&uuid, &uuid_buf) == AWS_OP_SUCCESS); + snprintf(endpoint->address, sizeof(endpoint->address), "testsock" PRInSTR ".local", AWS_BYTE_BUF_PRI(uuid_buf)); +} + +int aws_socket_init_poll_based( + struct aws_socket *socket, + struct aws_allocator *alloc, + const struct aws_socket_options *options) { + (void)socket; + (void)alloc; + (void)options; + + AWS_FATAL_ASSERT(!"This socket type is not implemented for this build configuration. You have selected a " + "poll-based socket, but no poll-based implementation is available"); + return aws_raise_error(AWS_ERROR_UNIMPLEMENTED); +} + +int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address) { + if (socket->local_endpoint.address[0] == 0) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: Socket has no local address. Socket must be bound first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + *out_address = socket->local_endpoint; + return AWS_OP_SUCCESS; +}