Skip to content

Commit

Permalink
fix nw socket listener
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Oct 1, 2024
1 parent 789d446 commit 42ea938
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 23 deletions.
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ elseif (APPLE)

file(GLOB AWS_IO_OS_SRC
"source/bsd/*.c"
"source/posix/*.c"
"source/posix/pipe.c"
"source/posix/host_resolver.c"
"source/posix/shared_library.c"
"source/darwin/darwin_pki_utils.c"
"source/darwin/secure_transport_tls_channel_handler.c"
)
Expand All @@ -144,6 +146,9 @@ elseif (APPLE)
)
list(APPEND AWS_IO_OS_SRC ${AWS_IO_DISPATCH_QUEUE_SRC})
else ()
file(GLOB AWS_IO_DISPATCH_QUEUE_SRC
"source/posix/socket.c"
)
set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE")
endif()

Expand Down
175 changes: 153 additions & 22 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <aws/common/clock.h>
#include <aws/common/string.h>
#include <aws/common/uuid.h>
#include <aws/io/logging.h>

#include <Network/Network.h>
Expand Down Expand Up @@ -75,6 +76,14 @@ enum socket_state {
CLOSED,
};

struct nw_socket_listener_args {
int error_code;
struct aws_allocator *allocator;
struct aws_socket *socket;
struct aws_socket *new_socket;
void *user_data;
};

struct nw_socket_timeout_args {
struct aws_task task;
struct aws_allocator *allocator;
Expand Down Expand Up @@ -112,7 +121,7 @@ struct nw_socket {
aws_socket_on_readable_fn *on_readable;
void *on_readable_user_data;
bool setup_run;
bool socket_open;
bool currently_connected; // If the io port is connected. Similar to posix_socket->currently_subscribed.
bool read_queued;
bool is_listener;
struct nw_socket_timeout_args *timeout_args;
Expand Down Expand Up @@ -162,11 +171,11 @@ static int s_setup_socket_params(struct nw_socket *nw_socket, const struct aws_s
});
} else if (options->domain == AWS_SOCKET_LOCAL) {
nw_socket->socket_options_to_params =
nw_parameters_create_custom_ip(AF_LOCAL, NW_PARAMETERS_DEFAULT_CONFIGURATION);
nw_parameters_create_secure_tcp(NW_PARAMETERS_DISABLE_PROTOCOL, NW_PARAMETERS_DEFAULT_CONFIGURATION);
}
} else if (options->type == AWS_SOCKET_DGRAM) {
nw_socket->socket_options_to_params =
nw_parameters_create_secure_udp(NW_PARAMETERS_DISABLE_PROTOCOL, NW_PARAMETERS_DEFAULT_CONFIGURATION);
nw_parameters_create_secure_udp(NW_PARAMETERS_DEFAULT_CONFIGURATION, NW_PARAMETERS_DEFAULT_CONFIGURATION);
}

if (!nw_socket->socket_options_to_params) {
Expand Down Expand Up @@ -372,7 +381,8 @@ static void s_process_readable_task(struct aws_task *task, void *arg, enum aws_t
struct nw_socket_readable_args *args = arg;

struct nw_socket *nw_socket = args->socket->impl;
nw_socket->on_readable(args->socket, args->error_code, nw_socket->on_readable_user_data);
if (nw_socket->on_readable)
nw_socket->on_readable(args->socket, args->error_code, nw_socket->on_readable_user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
Expand All @@ -399,7 +409,8 @@ static void s_process_connection_success_task(struct aws_task *task, void *arg,
struct nw_socket_readable_args *args = arg;

struct nw_socket *nw_socket = args->socket->impl;
nw_socket->on_connection_result_fn(args->socket, args->error_code, nw_socket->connect_accept_user_data);
if (nw_socket->on_connection_result_fn)
nw_socket->on_connection_result_fn(args->socket, args->error_code, nw_socket->connect_accept_user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
Expand All @@ -419,6 +430,40 @@ static void s_schedule_on_connection_success(struct aws_socket *socket, int erro
aws_event_loop_schedule_task_now(socket->event_loop, task);
}

static void s_process_listener_success_task(struct aws_task *task, void *args, enum aws_task_status status) {
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_listener_args *listener_args = args;

if (listener_args->socket->accept_result_fn)
listener_args->socket->accept_result_fn(
listener_args->socket, listener_args->error_code, listener_args->new_socket, listener_args->user_data);

aws_mem_release(listener_args->allocator, task);
aws_mem_release(listener_args->allocator, listener_args);
}

static void s_schedule_on_listener_success(
struct aws_socket *socket,
int error_code,
struct aws_socket *new_socket,
void *user_data) {

struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));
;
struct nw_socket_listener_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args));

args->socket = socket;
args->allocator = socket->allocator;
args->error_code = error_code;
args->new_socket = new_socket;
args->user_data = user_data;

aws_task_init(task, s_process_listener_success_task, args, "listenerSuccessTask");
aws_event_loop_schedule_task_now(socket->event_loop, task);
}

static void s_process_cancel_task(struct aws_task *task, void *arg, enum aws_task_status status) {
// TODO: WAHT IF THE TASK IS CANCELED???

Expand Down Expand Up @@ -455,7 +500,8 @@ static void s_process_write_task(struct aws_task *task, void *arg, enum aws_task
(void)status;
struct nw_socket_written_args *args = arg;

args->written_fn(args->socket, args->error_code, args->bytes_written, args->user_data);
if (args->written_fn)
args->written_fn(args->socket, args->error_code, args->bytes_written, args->user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
Expand Down Expand Up @@ -622,18 +668,20 @@ static int s_socket_connect_fn(
"id=%p handle=%p: connection success",
(void *)socket,
socket->io_handle.data.handle);
nw_socket->socket_open = true;
nw_socket->currently_connected = true;
nw_path_t path = nw_connection_copy_current_path(socket->io_handle.data.handle);
nw_endpoint_t local_endpoint = nw_path_copy_effective_local_endpoint(path);
nw_release(path);
const char *hostname = nw_endpoint_get_hostname(local_endpoint);
uint16_t port = nw_endpoint_get_port(local_endpoint);

size_t hostname_len = strlen(hostname);
size_t buffer_size = AWS_ARRAY_SIZE(socket->local_endpoint.address);
size_t to_copy = aws_min_size(hostname_len, buffer_size);
memcpy(socket->local_endpoint.address, hostname, to_copy);
socket->local_endpoint.port = port;
if (hostname != NULL) {
size_t hostname_len = strlen(hostname);
size_t buffer_size = AWS_ARRAY_SIZE(socket->local_endpoint.address);
size_t to_copy = aws_min_size(hostname_len, buffer_size);
memcpy(socket->local_endpoint.address, hostname, to_copy);
socket->local_endpoint.port = port;
}
nw_release(local_endpoint);

AWS_LOGF_DEBUG(
Expand Down Expand Up @@ -886,6 +934,45 @@ static int s_socket_start_accept_fn(
socket->connect_accept_user_data = user_data;
__block struct aws_allocator *allocator = socket->allocator;

nw_listener_set_state_changed_handler(
socket->io_handle.data.handle, ^(nw_listener_state_t state, nw_error_t error) {
errno = error ? nw_error_get_error_code(error) : 0;
if (state == nw_listener_state_waiting) {
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: lisnter on port waiting ",
(void *)socket,
socket->io_handle.data.handle);

} else if (state == nw_listener_state_failed) {
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: lisnter on port failed ",
(void *)socket,
socket->io_handle.data.handle);
/* any error, including if closed remotely in error */
int error_code = nw_error_get_error_code(error);
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: connection error %d",
(void *)socket,
socket->io_handle.data.handle,
error_code);
} else if (state == nw_listener_state_ready) {
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: lisnter on port ready ",
(void *)socket,
socket->io_handle.data.handle);
} else if (state == nw_listener_state_cancelled) {
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: lisnter on port cancelled ",
(void *)socket,
socket->io_handle.data.handle);
}
});

nw_listener_set_new_connection_handler(socket->io_handle.data.handle, ^(nw_connection_t connection) {
/* invoked upon an incoming connection. In BSD/Posix land this is the result of an
* accept() call. */
Expand All @@ -907,13 +994,22 @@ static int s_socket_start_accept_fn(
const char *hostname = nw_endpoint_get_hostname(endpoint);
uint16_t port = nw_endpoint_get_port(endpoint);

size_t hostname_len = strlen(hostname);
size_t buffer_size = AWS_ARRAY_SIZE(new_socket->remote_endpoint.address);
size_t to_copy = aws_min_size(hostname_len, buffer_size);
memcpy(new_socket->remote_endpoint.address, hostname, to_copy);
new_socket->remote_endpoint.port = port;
if (hostname != NULL) {
size_t hostname_len = strlen(hostname);
size_t buffer_size = AWS_ARRAY_SIZE(new_socket->remote_endpoint.address);
size_t to_copy = aws_min_size(hostname_len, buffer_size);
memcpy(new_socket->remote_endpoint.address, hostname, to_copy);
new_socket->remote_endpoint.port = port;
}
nw_release(endpoint);

// Setup socket state to start read/write operations.
new_socket->state = CONNECTED_READ | CONNECTED_WRITE;
struct nw_socket *new_nw_socket = new_socket->impl;
new_nw_socket->nw_connection = connection;
new_nw_socket->setup_run = true;
new_nw_socket->currently_connected = true;

AWS_LOGF_INFO(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: connected to %s:%d, incoming handle %p",
Expand All @@ -922,7 +1018,7 @@ static int s_socket_start_accept_fn(
new_socket->remote_endpoint.address,
new_socket->remote_endpoint.port,
new_socket->io_handle.data.handle);
on_accept_result(socket, AWS_OP_SUCCESS, new_socket, user_data);
s_schedule_on_listener_success(socket, AWS_OP_SUCCESS, new_socket, user_data);
});
nw_listener_start(socket->io_handle.data.handle);
return AWS_OP_SUCCESS;
Expand Down Expand Up @@ -959,14 +1055,14 @@ static int s_socket_close_fn(struct aws_socket *socket) {
nw_listener_cancel(socket->io_handle.data.handle);

} else {
if (nw_socket->socket_open) {
if (nw_socket->currently_connected) {
nw_connection_cancel(socket->io_handle.data.handle);
}

/* Setting to NULL removes previously set handler from nw_connection_t */
nw_connection_set_state_changed_handler(socket->io_handle.data.handle, NULL);
}
nw_socket->socket_open = false;
nw_socket->currently_connected = false;

return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -1058,7 +1154,7 @@ static void s_schedule_next_read(struct aws_socket *socket) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET, "id=%p handle=%p: read cb invoked", (void *)socket, socket->io_handle.data.handle);

if (!nw_socket->socket_open) {
if (!nw_socket->currently_connected) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle);
aws_raise_error(AWS_IO_SOCKET_CLOSED);
Expand Down Expand Up @@ -1228,7 +1324,7 @@ static int s_socket_write_fn(
(void *)socket,
socket->io_handle.data.handle);

if (!nw_socket->socket_open) {
if (!nw_socket->currently_connected) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle);
// As the socket is not open, we no longer have access to the event loop to schedule tasks
Expand Down Expand Up @@ -1288,3 +1384,38 @@ static bool s_socket_is_open_fn(struct aws_socket *socket) {

return nw_socket->last_error == AWS_OP_SUCCESS;
}

void aws_socket_endpoint_init_local_address_for_test(struct aws_socket_endpoint *endpoint) {
struct aws_uuid uuid;
AWS_FATAL_ASSERT(aws_uuid_init(&uuid) == AWS_OP_SUCCESS);
char uuid_str[AWS_UUID_STR_LEN] = {0};
struct aws_byte_buf uuid_buf = aws_byte_buf_from_empty_array(uuid_str, sizeof(uuid_str));
AWS_FATAL_ASSERT(aws_uuid_to_str(&uuid, &uuid_buf) == AWS_OP_SUCCESS);
snprintf(endpoint->address, sizeof(endpoint->address), "testsock" PRInSTR ".local", AWS_BYTE_BUF_PRI(uuid_buf));
}

int aws_socket_init_poll_based(
struct aws_socket *socket,
struct aws_allocator *alloc,
const struct aws_socket_options *options) {
(void)socket;
(void)alloc;
(void)options;

AWS_FATAL_ASSERT(!"This socket type is not implemented for this build configuration. You have selected a "
"poll-based socket, but no poll-based implementation is available");
return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
}

int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address) {
if (socket->local_endpoint.address[0] == 0) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p fd=%d: Socket has no local address. Socket must be bound first.",
(void *)socket,
socket->io_handle.data.fd);
return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
}
*out_address = socket->local_endpoint;
return AWS_OP_SUCCESS;
}

0 comments on commit 42ea938

Please sign in to comment.