Skip to content

Commit

Permalink
fix merge error and socket completion hanlder changes
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Sep 26, 2024
1 parent 86b4343 commit 9471d05
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 52 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ elseif (APPLE)
file(GLOB AWS_IO_OS_SRC
"source/bsd/*.c"
"source/posix/*.c"
"source/darwin/nw_socket.c"
"source/darwin/darwin_pki_utils.c"
"source/darwin/secure_transport_tls_channel_handler.c"
)
Expand Down
2 changes: 2 additions & 0 deletions include/aws/io/private/dispatch_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/

#include <Security/Security.h>
#include <aws/common/mutex.h>
#include <aws/common/thread.h>
#include <aws/io/tls_channel_handler.h>
#include <dispatch/dispatch.h>

Expand Down
255 changes: 203 additions & 52 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,27 @@ struct nw_socket_timeout_args {
struct aws_socket *socket;
};

struct nw_socket_readable_args {
int error_code;
struct aws_allocator *allocator;
struct aws_socket *socket;
};

struct nw_socket_written_args {
int error_code;
struct aws_allocator *allocator;
struct aws_socket *socket;
aws_socket_on_write_completed_fn *written_fn;
void *user_data;
size_t bytes_written;
};

struct nw_socket_cancel_task_args {
struct aws_allocator *allocator;
struct aws_socket *socket;
struct aws_task *task_to_cancel;
};

struct nw_socket {
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
Expand Down Expand Up @@ -344,6 +365,124 @@ static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_
}
}

static void s_process_readable_task(struct aws_task *task, void *arg, enum aws_task_status status) {
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_readable_args *args = arg;

struct nw_socket *nw_socket = args->socket->impl;
nw_socket->on_readable(args->socket, args->error_code, nw_socket->on_readable_user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
}

static void s_schedule_on_readable(struct aws_socket *socket, int error_code) {
struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));
;
struct nw_socket_readable_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args));

args->socket = socket;
args->allocator = socket->allocator;
args->error_code = error_code;

aws_task_init(task, s_process_readable_task, args, "readableTask");

aws_event_loop_schedule_task_now(socket->event_loop, task);
}

static void s_process_connection_success_task(struct aws_task *task, void *arg, enum aws_task_status status) {
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_readable_args *args = arg;

struct nw_socket *nw_socket = args->socket->impl;
nw_socket->on_connection_result_fn(args->socket, args->error_code, nw_socket->connect_accept_user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
}

static void s_schedule_on_connection_success(struct aws_socket *socket, int error_code) {

struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));
;
struct nw_socket_readable_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args));

args->socket = socket;
args->allocator = socket->allocator;
args->error_code = error_code;

aws_task_init(task, s_process_connection_success_task, args, "connectionSuccessTask");
aws_event_loop_schedule_task_now(socket->event_loop, task);
}

static void s_process_cancel_task(struct aws_task *task, void *arg, enum aws_task_status status) {
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_cancel_task_args *args = arg;

if (status == AWS_TASK_STATUS_RUN_READY)
aws_event_loop_cancel_task(args->socket->event_loop, args->task_to_cancel);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
}

// As cancel task has to run on the same thread & we dont have control on dispatch queue thread,
// we always schedule the cancel task on event loop
static void s_schedule_cancel_task(struct aws_socket *socket, struct aws_task *task_to_cancel) {

struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));
;
struct nw_socket_cancel_task_args *args =
aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_cancel_task_args));

args->socket = socket;
args->allocator = socket->allocator;
args->task_to_cancel = task_to_cancel;

aws_task_init(task, s_process_cancel_task, args, "cancelTaskTask");
aws_event_loop_schedule_task_now(socket->event_loop, task);
}

static void s_process_write_task(struct aws_task *task, void *arg, enum aws_task_status status) {
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_written_args *args = arg;

args->written_fn(args->socket, args->error_code, args->bytes_written, args->user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
}

static void s_schedule_write_fn(
struct aws_socket *socket,
int error_code,
size_t bytes_written,
void *user_data,
aws_socket_on_write_completed_fn *written_fn) {

struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));
;
struct nw_socket_written_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_written_args));

args->socket = socket;
args->allocator = socket->allocator;
args->error_code = error_code;
args->written_fn = written_fn;
args->user_data = user_data;
args->bytes_written = bytes_written;

aws_task_init(task, s_process_write_task, args, "writtenTask");
aws_event_loop_schedule_task_now(socket->event_loop, task);
}

static int s_socket_connect_fn(
struct aws_socket *socket,
const struct aws_socket_endpoint *remote_endpoint,
Expand Down Expand Up @@ -461,9 +600,6 @@ static int s_socket_connect_fn(

nw_socket->on_connection_result_fn = on_connection_result;
nw_socket->connect_accept_user_data = user_data;

struct nw_socket *socket_impl = socket->impl;

nw_socket->timeout_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_timeout_args));

nw_socket->timeout_args->socket = socket;
Expand All @@ -475,8 +611,6 @@ static int s_socket_connect_fn(
nw_socket->timeout_args,
"NWSocketConnectionTimeoutTask");

nw_connection_t handle = socket->io_handle.data.handle;

/* set a handler for socket state changes. This is where we find out if the connection timed out, was successful,
* was disconnected etc .... */
nw_connection_set_state_changed_handler(
Expand Down Expand Up @@ -510,12 +644,13 @@ static int s_socket_connect_fn(
socket->local_endpoint.address,
port);
// Cancel the connection timeout task
aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task);
s_schedule_cancel_task(socket, &nw_socket->timeout_args->task);
socket->state = CONNECTED_WRITE | CONNECTED_READ;
nw_socket->setup_run = true;
aws_ref_count_acquire(&nw_socket->ref_count);
on_connection_result(socket, AWS_OP_SUCCESS, user_data);
s_schedule_on_connection_success(socket, AWS_OP_SUCCESS);
aws_ref_count_release(&nw_socket->ref_count);

} else if (error) {
/* any error, including if closed remotely in error */
int error_code = nw_error_get_error_code(error);
Expand All @@ -525,10 +660,8 @@ static int s_socket_connect_fn(
(void *)socket,
socket->io_handle.data.handle,
error_code);

// Cancel the connection timeout task
aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task);

s_schedule_cancel_task(socket, &nw_socket->timeout_args->task);
/* we don't let this thing do DNS or TLS. Everything had better be a posix error. */
// AWS_ASSERT(nw_error_get_error_domain(error) == nw_error_domain_posix);
// DEBUG WIP we do in fact allow this to do TLS
Expand All @@ -538,18 +671,19 @@ static int s_socket_connect_fn(
socket->state = ERROR;
aws_ref_count_acquire(&nw_socket->ref_count);
if (!nw_socket->setup_run) {
on_connection_result(socket, error_code, user_data);
s_schedule_on_connection_success(socket, error_code);
nw_socket->setup_run = true;
} else if (socket->readable_fn) {
socket->readable_fn(socket, nw_socket->last_error, socket->readable_user_data);
s_schedule_on_readable(socket, nw_socket->last_error);
}

aws_ref_count_release(&nw_socket->ref_count);
} else if (state == nw_connection_state_cancelled || state == nw_connection_state_failed) {
/* this should only hit when the socket was closed by not us. Note,
* we uninstall this handler right before calling close on the socket so this shouldn't
* get hit unless it was triggered remotely */
// Cancel the connection timeout task
aws_event_loop_cancel_task(event_loop, &nw_socket->timeout_args->task);
s_schedule_cancel_task(socket, &nw_socket->timeout_args->task);
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: socket closed remotely.",
Expand All @@ -559,10 +693,10 @@ static int s_socket_connect_fn(
aws_ref_count_acquire(&nw_socket->ref_count);
aws_raise_error(AWS_IO_SOCKET_CLOSED);
if (!nw_socket->setup_run) {
on_connection_result(socket, AWS_IO_SOCKET_CLOSED, user_data);
s_schedule_on_connection_success(socket, AWS_IO_SOCKET_CLOSED);
nw_socket->setup_run = true;
} else if (socket->readable_fn) {
socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data);
s_schedule_on_readable(socket, AWS_IO_SOCKET_CLOSED);
}
aws_ref_count_release(&nw_socket->ref_count);
} else if (state == nw_connection_state_waiting) {
Expand Down Expand Up @@ -907,7 +1041,8 @@ static void s_schedule_next_read(struct aws_socket *socket) {
"id=%p handle=%p: cannot read to because it is not connected",
(void *)socket,
socket->io_handle.data.handle);
return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
return;
}

// Acquire nw_socket after we call connection receive, and released it when handler is called.
Expand Down Expand Up @@ -940,7 +1075,8 @@ static void s_schedule_next_read(struct aws_socket *socket) {
(void *)socket,
socket->io_handle.data.handle,
(int)dispatch_data_get_size(data));
nw_socket->on_readable(socket, AWS_ERROR_SUCCESS, nw_socket->on_readable_user_data);

s_schedule_on_readable(socket, AWS_ERROR_SUCCESS);
}
if (!is_complete) {
s_schedule_next_read(socket);
Expand All @@ -956,7 +1092,7 @@ static void s_schedule_next_read(struct aws_socket *socket) {
socket->io_handle.data.handle,
error_code);

nw_socket->on_readable(socket, error_code, nw_socket->on_readable_user_data);
s_schedule_on_readable(socket, error_code);
}
aws_ref_count_release(&nw_socket->ref_count);
});
Expand Down Expand Up @@ -1080,44 +1216,59 @@ static int s_socket_write_fn(

struct nw_socket *nw_socket = socket->impl;
aws_ref_count_acquire(&nw_socket->ref_count);
nw_connection_t handle = socket->io_handle.data.handle;

AWS_ASSERT(written_fn);

dispatch_data_t data = dispatch_data_create(cursor->ptr, cursor->len, NULL, DISPATCH_DATA_DESTRUCTOR_FREE);
nw_connection_send(handle, data, _nw_content_context_default_message, true, ^(nw_error_t error) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: processing write requests, called from aws_socket_write",
(void *)socket,
handle);

if (!nw_socket->socket_open) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, handle);
written_fn(socket, 0, 0, user_data);
goto nw_socket_release;
}

AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET, "id=%p handle=%p: DEBUG:: callback writing message: %p", (void *)socket, handle, user_data);
int error_code = !error || nw_error_get_error_code(error) == 0
? AWS_OP_SUCCESS
: s_determine_socket_error(nw_error_get_error_code(error));

if (error_code) {
nw_socket->last_error = error_code;
aws_raise_error(error_code);
dispatch_data_t data = dispatch_data_create(cursor->ptr, cursor->len, NULL, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
nw_connection_send(
socket->io_handle.data.handle, data, _nw_content_context_default_message, true, ^(nw_error_t error) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: processing write requests, called from aws_socket_write",
(void *)socket,
socket->io_handle.data.handle);

if (!nw_socket->socket_open) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle);
// As the socket is not open, we no longer have access to the event loop to schedule tasks
// directly execute the written callback instead of scheduling a task.
written_fn(socket, 0, 0, user_data);
goto nw_socket_release;
}

AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET, "id=%p handle=%p: error during write %d", (void *)socket, handle, error_code);
}

size_t written_size = dispatch_data_get_size(data);
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET, "id=%p handle=%p: send written size %d", (void *)socket, handle, (int)written_size);
written_fn(socket, error_code, !error_code ? written_size : 0, user_data);
nw_socket_release:
aws_ref_count_release(&nw_socket->ref_count);
});
AWS_LS_IO_SOCKET,
"id=%p handle=%p: DEBUG:: callback writing message: %p",
(void *)socket,
socket->io_handle.data.handle,
user_data);
int error_code = !error || nw_error_get_error_code(error) == 0
? AWS_OP_SUCCESS
: s_determine_socket_error(nw_error_get_error_code(error));

if (error_code) {
nw_socket->last_error = error_code;
aws_raise_error(error_code);
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: error during write %d",
(void *)socket,
socket->io_handle.data.handle,
error_code);
}

size_t written_size = dispatch_data_get_size(data);
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: send written size %d",
(void *)socket,
socket->io_handle.data.handle,
(int)written_size);
s_schedule_write_fn(socket, error_code, !error_code ? written_size : 0, user_data, written_fn);
nw_socket_release:
aws_ref_count_release(&nw_socket->ref_count);
});

return AWS_OP_SUCCESS;
}
Expand Down

0 comments on commit 9471d05

Please sign in to comment.