Skip to content

Commit

Permalink
add socket vtable
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Oct 2, 2024
1 parent e8fe46d commit 2f233bb
Show file tree
Hide file tree
Showing 8 changed files with 620 additions and 391 deletions.
15 changes: 11 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ option(BUILD_RELOCATABLE_BINARIES
option(BYO_CRYPTO "Don't build a tls implementation or link against a crypto interface. This feature is only for unix builds currently."
OFF)
# DEBUG: directly set AWS_USE_DISPATCH_QUEUE
set (AWS_USE_DISPATCH_QUEUE ON)
set (AWS_USE_DISPATCH_QUEUE OFF)

file(GLOB AWS_IO_HEADERS
"include/aws/io/*.h"
Expand Down Expand Up @@ -117,7 +117,9 @@ elseif (APPLE)

file(GLOB AWS_IO_OS_SRC
"source/bsd/*.c"
"source/posix/*.c"
"source/posix/pipe.c"
"source/posix/host_resolver.c"
"source/posix/shared_library.c"
"source/darwin/darwin_pki_utils.c"
"source/darwin/secure_transport_tls_channel_handler.c"
)
Expand All @@ -140,10 +142,15 @@ elseif (APPLE)
message("use dispatch queue")
file(GLOB AWS_IO_DISPATCH_QUEUE_SRC
"source/darwin/dispatch_queue_event_loop.c"
"source/darwin/nw_socket.c"
)
list(APPEND AWS_IO_OS_SRC ${AWS_IO_DISPATCH_QUEUE_SRC})
else ()
set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE")
file(GLOB AWS_KQUEUE_SRC
"source/posix/socket.c"
)
set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE")
list(APPEND AWS_IO_OS_SRC ${AWS_KQUEUE_SRC})
endif()

elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetBSD" OR CMAKE_SYSTEM_NAME STREQUAL "OpenBSD")
Expand Down Expand Up @@ -252,4 +259,4 @@ if (NOT CMAKE_CROSSCOMPILING)
if (BUILD_TESTING)
add_subdirectory(tests)
endif()
endif()
endif()
47 changes: 45 additions & 2 deletions include/aws/io/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

#include <aws/io/channel.h>
#include <aws/io/event_loop.h>
#include <aws/io/io.h>

AWS_PUSH_SANE_WARNING_LEVEL
Expand Down Expand Up @@ -78,7 +79,7 @@ typedef void(aws_socket_on_connection_result_fn)(struct aws_socket *socket, int
* A user may want to call aws_socket_set_options() on the new socket if different options are desired.
*
* new_socket is not yet assigned to an event-loop. The user should call aws_socket_assign_to_event_loop() before
* performing IO operations.
* performing IO operations. The user is resposnbile to releasing the socket memory after use.
*
* When error_code is AWS_ERROR_SUCCESS, new_socket is the recently accepted connection.
* If error_code is non-zero, an error occurred and you should aws_socket_close() the socket.
Expand Down Expand Up @@ -114,7 +115,44 @@ struct aws_socket_endpoint {
uint32_t port;
};

struct aws_socket;

struct aws_socket_vtable {
void (*socket_cleanup_fn)(struct aws_socket *socket);
int (*socket_connect_fn)(
struct aws_socket *socket,
const struct aws_socket_endpoint *remote_endpoint,
struct aws_event_loop *event_loop,
aws_socket_on_connection_result_fn *on_connection_result,
void *user_data);
int (*socket_bind_fn)(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint);
int (*socket_listen_fn)(struct aws_socket *socket, int backlog_size);
int (*socket_start_accept_fn)(
struct aws_socket *socket,
struct aws_event_loop *accept_loop,
aws_socket_on_accept_result_fn *on_accept_result,
void *user_data);
int (*socket_stop_accept_fn)(struct aws_socket *socket);
int (*socket_close_fn)(struct aws_socket *socket);
int (*socket_shutdown_dir_fn)(struct aws_socket *socket, enum aws_channel_direction dir);
int (*socket_set_options_fn)(struct aws_socket *socket, const struct aws_socket_options *options);
int (*socket_assign_to_event_loop_fn)(struct aws_socket *socket, struct aws_event_loop *event_loop);
int (*socket_subscribe_to_readable_events_fn)(
struct aws_socket *socket,
aws_socket_on_readable_fn *on_readable,
void *user_data);
int (*socket_read_fn)(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read);
int (*socket_write_fn)(
struct aws_socket *socket,
const struct aws_byte_cursor *cursor,
aws_socket_on_write_completed_fn *written_fn,
void *user_data);
int (*socket_get_error_fn)(struct aws_socket *socket);
bool (*socket_is_open_fn)(struct aws_socket *socket);
};

struct aws_socket {
struct aws_socket_vtable *vtable;
struct aws_allocator *allocator;
struct aws_socket_endpoint local_endpoint;
struct aws_socket_endpoint remote_endpoint;
Expand All @@ -123,6 +161,7 @@ struct aws_socket {
struct aws_event_loop *event_loop;
struct aws_channel_handler *handler;
int state;
enum aws_event_loop_style event_loop_style;
aws_socket_on_readable_fn *readable_fn;
void *readable_user_data;
aws_socket_on_connection_result_fn *connection_result_fn;
Expand Down Expand Up @@ -172,6 +211,10 @@ AWS_IO_API void aws_socket_clean_up(struct aws_socket *socket);
* In TCP, LOCAL and VSOCK this function will not block. If the return value is successful, then you must wait on the
* `on_connection_result()` callback to be invoked before using the socket.
*
* The function will failed with error if the endpoint is invalid. Except for LOCAL with AWS_USE_DISPATCH_QUEUE (with
* Apple Network Framework enabled). In Apple network framework, we would not know if the local endpoint is valid until
* we have the connection state back, Make sure to validate in `on_connection_result` for this case.
*
* If an event_loop is provided for UDP sockets, a notification will be sent on
* on_connection_result in the event-loop's thread. Upon completion, the socket will already be assigned
* an event loop. If NULL is passed for UDP, it will immediately return upon success, but you must call
Expand Down Expand Up @@ -260,7 +303,7 @@ AWS_IO_API int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct
AWS_IO_API struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket);

/**
* Subscribes on_readable to notifications when the socket goes readable (edge-triggered). Errors will also be recieved
* Subscribes on_readable to notifications when the socket goes readable (edge-triggered). Errors will also be received
* in the callback.
*
* Note! This function is technically not thread safe, but we do not enforce which thread you call from.
Expand Down
20 changes: 8 additions & 12 deletions source/channel_bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ static bool s_aws_socket_domain_uses_dns(enum aws_socket_domain domain) {
return domain == AWS_SOCKET_IPV4 || domain == AWS_SOCKET_IPV6;
}

/* Called when a socket connection attempt task completes. First socket to successfully open
* assigns itself to connection_args->channel_data.socket and flips connection_args->connection_chosen
* to true. Subsequent successful sockets will be released and cleaned up
*/
static void s_on_client_connection_established(struct aws_socket *socket, int error_code, void *user_data) {
struct client_connection_args *connection_args = user_data;

Expand Down Expand Up @@ -565,7 +569,6 @@ static void s_on_client_connection_established(struct aws_socket *socket, int er
(void *)connection_args->bootstrap,
(void *)socket);
aws_socket_close(socket);

aws_socket_clean_up(socket);
aws_mem_release(connection_args->bootstrap->allocator, socket);

Expand Down Expand Up @@ -813,7 +816,7 @@ int aws_client_bootstrap_new_socket_channel(struct aws_socket_channel_bootstrap_
AWS_FATAL_ASSERT(options->shutdown_callback);
AWS_FATAL_ASSERT(bootstrap);

const struct aws_socket_options *socket_options = options->socket_options;
struct aws_socket_options *socket_options = (struct aws_socket_options *)options->socket_options;
AWS_FATAL_ASSERT(socket_options != NULL);

const struct aws_tls_connection_options *tls_options = options->tls_options;
Expand All @@ -832,10 +835,6 @@ int aws_client_bootstrap_new_socket_channel(struct aws_socket_channel_bootstrap_
struct client_connection_args *client_connection_args =
aws_mem_calloc(bootstrap->allocator, 1, sizeof(struct client_connection_args));

if (!client_connection_args) {
return AWS_OP_ERR;
}

const char *host_name = options->host_name;
uint32_t port = options->port;

Expand Down Expand Up @@ -1361,9 +1360,7 @@ void s_on_server_connection_result(
(void *)socket);
struct server_channel_data *channel_data =
aws_mem_calloc(connection_args->bootstrap->allocator, 1, sizeof(struct server_channel_data));
if (!channel_data) {
goto error_cleanup;
}

channel_data->incoming_called = false;
channel_data->socket = new_socket;
channel_data->server_connection_args = connection_args;
Expand All @@ -1376,11 +1373,10 @@ void s_on_server_connection_result(
.setup_user_data = channel_data,
.shutdown_user_data = channel_data,
.on_shutdown_completed = s_on_server_channel_on_shutdown,
.event_loop = event_loop,
.enable_read_back_pressure = channel_data->server_connection_args->enable_read_back_pressure,
};

channel_args.event_loop = event_loop;
channel_args.enable_read_back_pressure = channel_data->server_connection_args->enable_read_back_pressure;

if (aws_socket_assign_to_event_loop(new_socket, event_loop)) {
aws_mem_release(connection_args->bootstrap->allocator, (void *)channel_data);
goto error_cleanup;
Expand Down
Loading

0 comments on commit 2f233bb

Please sign in to comment.