Skip to content

Commit

Permalink
merge latest nw_socket
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK committed Mar 6, 2025
2 parents 5b87289 + 94e7a5b commit 0d1dd8e
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 207 deletions.
9 changes: 5 additions & 4 deletions include/aws/io/channel_bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ struct aws_server_socket_channel_bootstrap_options {
uint32_t port;
const struct aws_socket_options *socket_options;
const struct aws_tls_connection_options *tls_options;
aws_server_bootstrap_on_listener_setup_fn *setup_callback;
aws_server_bootstrap_on_accept_channel_setup_fn *incoming_callback;
aws_server_bootstrap_on_accept_channel_shutdown_fn *shutdown_callback;
aws_server_bootstrap_on_listener_setup_fn *setup_callback;
aws_server_bootstrap_on_server_listener_destroy_fn *destroy_callback;
bool enable_read_back_pressure;
void *user_data;
Expand Down Expand Up @@ -297,6 +297,10 @@ AWS_IO_API int aws_server_bootstrap_set_alpn_callback(
* shutting down. Immediately after the `shutdown_callback` returns, the channel is cleaned up automatically. All
* callbacks are invoked the thread of the event-loop that the listening socket is assigned to
*
* `setup_callback`. If set, the callback will be asynchronously invoked when the listener is ready for use. For Apple
* Network Framework, the listener is not usable until the callback is invoked. If the listener creation failed
* (return NULL), the `setup_callback` will not be invoked.
*
* Upon shutdown of your application, you'll want to call `aws_server_bootstrap_destroy_socket_listener` with the return
* value from this function.
*
Expand All @@ -305,9 +309,6 @@ AWS_IO_API int aws_server_bootstrap_set_alpn_callback(
AWS_IO_API struct aws_socket *aws_server_bootstrap_new_socket_listener(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options);

AWS_IO_API struct aws_socket *aws_server_bootstrap_new_socket_listener_async(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options);

/**
* Shuts down 'listener' and cleans up any resources associated with it. Any incoming channels on `listener` will still
* be active. `destroy_callback` will be invoked after the server socket listener is destroyed, and all associated
Expand Down
4 changes: 2 additions & 2 deletions include/aws/io/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ struct aws_socket_listener_options {
void *on_accept_result_user_data;

// This callback is invoked when the listener starts accepting incoming connections.
// It is only triggered in asynchronous listener APIs while using nw_socket.
aws_socket_on_accept_started_fn *on_accept_start_result;
// If the callback set, the socket must not be released before the callback invoked.
aws_socket_on_accept_started_fn *on_accept_start;
void *on_accept_start_user_data;
};

Expand Down
171 changes: 25 additions & 146 deletions source/channel_bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -1790,143 +1790,6 @@ static void s_listener_destroy_task(struct aws_task *task, void *arg, enum aws_t
aws_socket_clean_up(&server_connection_args->listener);
}

struct aws_socket *aws_server_bootstrap_new_socket_listener(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options) {
AWS_PRECONDITION(bootstrap_options);
AWS_PRECONDITION(bootstrap_options->bootstrap);
AWS_PRECONDITION(bootstrap_options->incoming_callback);
AWS_PRECONDITION(bootstrap_options->shutdown_callback);

struct server_connection_args *server_connection_args =
aws_mem_calloc(bootstrap_options->bootstrap->allocator, 1, sizeof(struct server_connection_args));
if (!server_connection_args) {
return NULL;
}

AWS_LOGF_INFO(
AWS_LS_IO_CHANNEL_BOOTSTRAP,
"id=%p: attempting to initialize a new "
"server socket listener for %s:%u",
(void *)bootstrap_options->bootstrap,
bootstrap_options->host_name,
bootstrap_options->port);

aws_ref_count_init(
&server_connection_args->ref_count,
server_connection_args,
(aws_simple_completion_callback *)s_server_connection_args_destroy);
server_connection_args->user_data = bootstrap_options->user_data;
server_connection_args->bootstrap = aws_server_bootstrap_acquire(bootstrap_options->bootstrap);
server_connection_args->shutdown_callback = bootstrap_options->shutdown_callback;
server_connection_args->incoming_callback = bootstrap_options->incoming_callback;
server_connection_args->destroy_callback = bootstrap_options->destroy_callback;
server_connection_args->on_protocol_negotiated = bootstrap_options->bootstrap->on_protocol_negotiated;
server_connection_args->enable_read_back_pressure = bootstrap_options->enable_read_back_pressure;
server_connection_args->retrieve_tls_options = s_retrieve_server_tls_options;

aws_task_init(
&server_connection_args->listener_destroy_task,
s_listener_destroy_task,
server_connection_args,
"listener socket destroy");

if (bootstrap_options->tls_options) {
AWS_LOGF_INFO(
AWS_LS_IO_CHANNEL_BOOTSTRAP, "id=%p: using tls on listener", (void *)bootstrap_options->tls_options);
if (aws_tls_connection_options_copy(&server_connection_args->tls_options, bootstrap_options->tls_options)) {
goto cleanup_server_connection_args;
}

server_connection_args->use_tls = true;

server_connection_args->tls_user_data = bootstrap_options->tls_options->user_data;

/* in order to honor any callbacks a user may have installed on their tls_connection_options,
* we need to wrap them if they were set.*/
if (bootstrap_options->bootstrap->on_protocol_negotiated) {
server_connection_args->tls_options.advertise_alpn_message = true;
}

if (bootstrap_options->tls_options->on_data_read) {
server_connection_args->user_on_data_read = bootstrap_options->tls_options->on_data_read;
server_connection_args->tls_options.on_data_read = s_tls_server_on_data_read;
}

if (bootstrap_options->tls_options->on_error) {
server_connection_args->user_on_error = bootstrap_options->tls_options->on_error;
server_connection_args->tls_options.on_error = s_tls_server_on_error;
}

if (bootstrap_options->tls_options->on_negotiation_result) {
server_connection_args->user_on_negotiation_result = bootstrap_options->tls_options->on_negotiation_result;
}

server_connection_args->tls_options.on_negotiation_result = s_tls_server_on_negotiation_result;
server_connection_args->tls_options.user_data = server_connection_args;
}

struct aws_event_loop *connection_loop =
aws_event_loop_group_get_next_loop(bootstrap_options->bootstrap->event_loop_group);

if (aws_socket_init(
&server_connection_args->listener,
bootstrap_options->bootstrap->allocator,
bootstrap_options->socket_options)) {
goto cleanup_server_connection_args;
}

struct aws_socket_endpoint endpoint;
AWS_ZERO_STRUCT(endpoint);
size_t host_name_len = 0;
if (aws_secure_strlen(bootstrap_options->host_name, sizeof(endpoint.address), &host_name_len)) {
goto cleanup_server_connection_args;
}

memcpy(endpoint.address, bootstrap_options->host_name, host_name_len);
endpoint.port = bootstrap_options->port;

if (aws_socket_bind(
&server_connection_args->listener, &endpoint, s_retrieve_server_tls_options, server_connection_args)) {
goto cleanup_listener;
}

if (aws_socket_listen(&server_connection_args->listener, 1024)) {
goto cleanup_listener;
}

struct aws_socket_listener_options options = {
.on_accept_result = s_on_server_connection_result,
.on_accept_result_user_data = server_connection_args,
.on_accept_start_result = NULL,
.on_accept_start_user_data = NULL,
};

if (aws_socket_start_accept(&server_connection_args->listener, connection_loop, options)) {
goto cleanup_listener;
}

return &server_connection_args->listener;

cleanup_listener:

; // This line just used to avoid expression error after the label

SETUP_SOCKET_SHUTDOWN_CALLBACKS(
bootstrap_options->bootstrap->allocator,
&server_connection_args->listener,
socket_shutdown_release_server_connection_args,
s_socket_shutdown_complete_release_server_connection_fn,
server_connection_args)

aws_socket_clean_up(&server_connection_args->listener);
return NULL;

cleanup_server_connection_args:
s_server_connection_args_release(server_connection_args);

return NULL;
}

/* Called when a listener connection attempt task completes.
*/
static void s_on_listener_connection_established(struct aws_socket *socket, int error_code, void *user_data) {
Expand Down Expand Up @@ -1961,13 +1824,16 @@ static void s_on_listener_connection_established(struct aws_socket *socket, int
return;
}

struct aws_socket *aws_server_bootstrap_new_socket_listener_async(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options) {
struct aws_socket *s_server_bootstrap_new_socket_listener(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options,
bool async_setup) {
AWS_PRECONDITION(bootstrap_options);
AWS_PRECONDITION(bootstrap_options->bootstrap);
AWS_PRECONDITION(bootstrap_options->incoming_callback);
AWS_PRECONDITION(bootstrap_options->shutdown_callback);
AWS_PRECONDITION(bootstrap_options->setup_callback);
if (async_setup) {
AWS_PRECONDITION(bootstrap_options->setup_callback);
}

struct server_connection_args *server_connection_args =
aws_mem_calloc(bootstrap_options->bootstrap->allocator, 1, sizeof(struct server_connection_args));
Expand Down Expand Up @@ -2066,18 +1932,26 @@ struct aws_socket *aws_server_bootstrap_new_socket_listener_async(
goto cleanup_listener;
}

// Acquire for listener establish callbacks, should be released in `s_on_listener_connection_established`
s_server_connection_args_acquire(server_connection_args);

struct aws_socket_listener_options options = {
.on_accept_result = s_on_server_connection_result,
.on_accept_result_user_data = server_connection_args,
.on_accept_start_result = s_on_listener_connection_established,
.on_accept_start_user_data = server_connection_args,
.on_accept_start = NULL,
.on_accept_start_user_data = NULL,
};

if (async_setup) {
// If we use an async socket, acquire the connection args for listener establish callbacks, if
// aws_socket_start_accept succeed, the args should be released in `s_on_listener_connection_established`
s_server_connection_args_acquire(server_connection_args);
options.on_accept_start = s_on_listener_connection_established;
options.on_accept_start_user_data = server_connection_args;
}

if (aws_socket_start_accept(&server_connection_args->listener, connection_loop, options)) {
s_server_connection_args_release(server_connection_args);
if (async_setup) {
// release the args we acquired above
s_server_connection_args_release(server_connection_args);
}
goto cleanup_listener;
}

Expand All @@ -2103,6 +1977,11 @@ struct aws_socket *aws_server_bootstrap_new_socket_listener_async(
return NULL;
}

struct aws_socket *aws_server_bootstrap_new_socket_listener(
const struct aws_server_socket_channel_bootstrap_options *bootstrap_options) {
return s_server_bootstrap_new_socket_listener(bootstrap_options, bootstrap_options->setup_callback);
}

void aws_server_bootstrap_destroy_socket_listener(struct aws_server_bootstrap *bootstrap, struct aws_socket *listener) {
struct server_connection_args *server_connection_args =
AWS_CONTAINER_OF(listener, struct server_connection_args, listener);
Expand Down
35 changes: 13 additions & 22 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -2105,7 +2105,6 @@ static int s_socket_listen_fn(struct aws_socket *socket, int backlog_size) {
struct listener_state_changed_args {
struct aws_task task;
struct aws_allocator *allocator;
struct aws_socket *socket;
struct nw_socket *nw_socket;
nw_listener_state_t state;
int error;
Expand All @@ -2128,10 +2127,9 @@ static void s_process_listener_state_changed_task(struct aws_task *task, void *a
(void *)nw_socket,
(void *)nw_listener);

/* Ideally we should not have a canceled task here, as nw_socket keeps a reference to event loop, therefore the
* event loop should never be destroyed before the nw_socket get destroyed. If we manually cancel the task, we
* should make sure we carefully handled the state change eventually, as the socket relies on this task to
* release and cleanup.
/* Ideally we should not have a task with AWS_TASK_STATUS_CANCELED here, as the event loop should never be destroyed
* before the nw_socket get destroyed. If we manually cancel the task, we should make sure we carefully handled the
* state change eventually, as the socket relies on this task to release and cleanup.
*/
if (status != AWS_TASK_STATUS_CANCELED) {

Expand All @@ -2152,10 +2150,10 @@ static void s_process_listener_state_changed_task(struct aws_task *task, void *a
crt_error_code);

s_lock_base_socket(nw_socket);
struct aws_socket *aws_socket = nw_socket->base_socket_synced_data.base_socket;
s_lock_socket_synced_data(nw_socket);
s_set_socket_state(nw_socket, listener_state_changed_args->socket, ERROR);
s_set_socket_state(nw_socket, aws_socket, ERROR);
s_unlock_socket_synced_data(nw_socket);
struct aws_socket *aws_socket = nw_socket->base_socket_synced_data.base_socket;
if (nw_socket->on_accept_started_fn) {
nw_socket->on_accept_started_fn(
aws_socket, crt_error_code, nw_socket->listen_accept_started_user_data);
Expand Down Expand Up @@ -2186,9 +2184,12 @@ static void s_process_listener_state_changed_task(struct aws_task *task, void *a
case nw_listener_state_cancelled: {
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET, "id=%p handle=%p: listener cancelled.", (void *)nw_socket, (void *)nw_listener);
s_lock_base_socket(nw_socket);
struct aws_socket *aws_socket = nw_socket->base_socket_synced_data.base_socket;
s_lock_socket_synced_data(nw_socket);
s_set_socket_state(nw_socket, listener_state_changed_args->socket, CLOSED);
s_set_socket_state(nw_socket, aws_socket, CLOSED);
s_unlock_socket_synced_data(nw_socket);
s_unlock_base_socket(nw_socket);
s_socket_release_internal_ref(nw_socket);
} break;
default:
Expand Down Expand Up @@ -2219,13 +2220,10 @@ static void s_handle_listener_state_changed_fn(
nw_error_code,
crt_error_code);

s_lock_base_socket(nw_socket);
struct aws_socket *aws_socket = nw_socket->base_socket_synced_data.base_socket;
if (aws_socket && s_validate_event_loop(nw_socket->event_loop)) {
if (s_validate_event_loop(nw_socket->event_loop)) {
struct listener_state_changed_args *args =
aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct listener_state_changed_args));

args->socket = aws_socket;
args->nw_socket = nw_socket;
args->allocator = nw_socket->allocator;
args->error = crt_error_code;
Expand All @@ -2234,16 +2232,9 @@ static void s_handle_listener_state_changed_fn(
s_socket_acquire_internal_ref(nw_socket);
aws_task_init(&args->task, s_process_listener_state_changed_task, args, "ListenerStateChangedTask");
aws_event_loop_schedule_task_now(nw_socket->event_loop, &args->task);
} else if (state == nw_listener_state_cancelled) {
// If socket is already destroyed and the listener is canceled, directly closed the internal socket.
s_lock_socket_synced_data(nw_socket);
s_set_socket_state(nw_socket, aws_socket, CLOSED);
s_unlock_socket_synced_data(nw_socket);

s_socket_release_internal_ref(nw_socket);
} else {
AWS_FATAL_ASSERT(true && "The nw_socket should be always attached to a validate event loop.");
}

s_unlock_base_socket(nw_socket);
}

static int s_socket_start_accept_fn(
Expand Down Expand Up @@ -2280,7 +2271,7 @@ static int s_socket_start_accept_fn(
socket->accept_result_fn = options.on_accept_result;
socket->connect_accept_user_data = options.on_accept_result_user_data;

nw_socket->on_accept_started_fn = options.on_accept_start_result;
nw_socket->on_accept_started_fn = options.on_accept_start;
nw_socket->listen_accept_started_user_data = options.on_accept_start_user_data;

s_set_event_loop(socket, accept_loop);
Expand Down
Loading

0 comments on commit 0d1dd8e

Please sign in to comment.