Skip to content

Commit

Permalink
additional modularization to streamline UDP and TCP flows
Browse files Browse the repository at this point in the history
  • Loading branch information
bradh352 committed Aug 17, 2024
1 parent 4934c7a commit 9277b59
Showing 1 changed file with 93 additions and 79 deletions.
172 changes: 93 additions & 79 deletions src/lib/ares_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@


static void timeadd(ares_timeval_t *now, size_t millisecs);
static void ares_notify_write_fds(ares_channel_t *channel, fd_set *write_fds,
ares_socket_t write_fd);
static void read_packets(ares_channel_t *channel, fd_set *read_fds,
static void process_write(ares_channel_t *channel, fd_set *write_fds,
ares_socket_t write_fd);
static void process_read(ares_channel_t *channel, fd_set *read_fds,
ares_socket_t read_fd, const ares_timeval_t *now);
static void process_timeouts(ares_channel_t *channel,
const ares_timeval_t *now);
static ares_status_t process_answer(ares_channel_t *channel,
const unsigned char *abuf, size_t alen,
ares_conn_t *conn, ares_bool_t tcp,
ares_conn_t *conn,
const ares_timeval_t *now);
static void handle_conn_error(ares_conn_t *conn, ares_bool_t critical_failure,
ares_status_t failure_status);
Expand Down Expand Up @@ -202,9 +202,9 @@ static void processfds(ares_channel_t *channel, fd_set *read_fds,

ares__channel_lock(channel);
ares__tvnow(&now);
read_packets(channel, read_fds, read_fd, &now);
process_read(channel, read_fds, read_fd, &now);
process_timeouts(channel, &now);
ares_notify_write_fds(channel, write_fds, write_fd);
process_write(channel, write_fds, write_fd);

/* See if any connections should be cleaned up */
ares__check_cleanup_conns(channel);
Expand Down Expand Up @@ -287,8 +287,8 @@ fprintf(stderr, "%s(): fd=%d, flags=%d, state_flags=%d\n", __FUNCTION__, (int)co
}
}

static void ares_notify_write_fds(ares_channel_t *channel, fd_set *write_fds,
ares_socket_t write_fd)
static void process_write(ares_channel_t *channel, fd_set *write_fds,
ares_socket_t write_fd)
{
size_t i;
ares_socket_t *socketlist = NULL;
Expand Down Expand Up @@ -380,42 +380,83 @@ void ares_process_pending_write(ares_channel_t *channel)
ares__channel_unlock(channel);
}

/* If any TCP socket selects true for reading, read some data,
* allocate a buffer if we finish reading the length word, and process
* a packet if we finish reading one.
*/
static void read_tcp_data(ares_channel_t *channel, ares_conn_t *conn,
const ares_timeval_t *now)
static ares_status_t read_conn_packets(ares_conn_t *conn)
{
size_t count;
ares_bool_t read_again;
ares_conn_err_t err;
ares_channel_t *channel = conn->server->channel;

/* Fetch buffer to store data we are reading */
size_t ptr_len = 65535;
unsigned char *ptr;
do {
size_t count;
size_t len = 65535;
unsigned char *ptr;
size_t start_len = ares__buf_len(conn->in_buf);

/* If UDP, lets write out a placeholder for the length indicator */
if (!(conn->flags & ARES_CONN_FLAG_TCP)) {
if (ares__buf_append_be16(conn->in_buf, 0) != ARES_SUCCESS) {
handle_conn_error(conn, ARES_FALSE /* not critical to connection */,
ARES_SUCCESS);
return ARES_ENOMEM;
}
}

ptr = ares__buf_append_start(conn->in_buf, &ptr_len);
/* Get a buffer of sufficient size */
ptr = ares__buf_append_start(conn->in_buf, &len);

if (ptr == NULL) {
handle_conn_error(conn, ARES_FALSE /* not critical to connection */,
ARES_SUCCESS);
return; /* bail out on malloc failure. TODO: make this
function return error codes */
}
if (ptr == NULL) {
handle_conn_error(conn, ARES_FALSE /* not critical to connection */,
ARES_SUCCESS);
return ARES_ENOMEM;
}

/* Read from socket */
err = ares__conn_read(conn, ptr, ptr_len, &count);
/* Read from socket */
err = ares__conn_read(conn, ptr, len, &count);

if (err != ARES_CONN_ERR_SUCCESS) {
ares__buf_append_finish(conn->in_buf, 0);
if (err != ARES_CONN_ERR_WOULDBLOCK) {
handle_conn_error(conn, ARES_TRUE, ARES_ECONNREFUSED);
if (err != ARES_CONN_ERR_SUCCESS) {
ares__buf_append_finish(conn->in_buf, 0);
if (!(conn->flags & ARES_CONN_FLAG_TCP)) {
ares__buf_set_length(conn->in_buf, start_len);
}
break;
}
return;

/* Record amount of data read */
ares__buf_append_finish(conn->in_buf, (size_t)count);

/* Only loop if we're not overwriting socket functions, and are using UDP
* or are using TCP and read the maximum buffer size */
read_again = ARES_FALSE;
if (channel->sock_funcs == NULL) {
if (!(conn->flags & ARES_CONN_FLAG_TCP)) {
read_again = ARES_TRUE;
} else if (count == len) {
read_again = ARES_TRUE;
}
}

/* If UDP, overwrite length */
if (!(conn->flags & ARES_CONN_FLAG_TCP)) {
len = ares__buf_len(conn->in_buf);
ares__buf_set_length(conn->in_buf, start_len);
ares__buf_append_be16(conn->in_buf, (unsigned short)count);
ares__buf_set_length(conn->in_buf, len);
}
/* Try to read again only if *we* set up the socket, otherwise it may be
* a blocking socket and would cause recvfrom to hang. */
} while (read_again);

if (err != ARES_CONN_ERR_SUCCESS && err != ARES_CONN_ERR_WOULDBLOCK) {
handle_conn_error(conn, ARES_TRUE, ARES_ECONNREFUSED);
return ARES_ECONNREFUSED;
}

/* Record amount of data read */
ares__buf_append_finish(conn->in_buf, (size_t)count);
return ARES_SUCCESS;
}

static void read_answers(ares_conn_t *conn, const ares_timeval_t *now)
{
ares_channel_t *channel = conn->server->channel;

/* Process all queued answers */
while (1) {
Expand Down Expand Up @@ -451,7 +492,7 @@ static void read_tcp_data(ares_channel_t *channel, ares_conn_t *conn,
data_len -= 2;

/* We finished reading this answer; process it */
status = process_answer(channel, data, data_len, conn, ARES_TRUE, now);
status = process_answer(channel, data, data_len, conn, now);
if (status != ARES_SUCCESS) {
handle_conn_error(conn, ARES_TRUE, status);
return;
Expand All @@ -462,42 +503,25 @@ static void read_tcp_data(ares_channel_t *channel, ares_conn_t *conn,
}
}

/* If any UDP sockets select true for reading, process them. */
static void read_udp_packets_fd(ares_channel_t *channel, ares_conn_t *conn,
const ares_timeval_t *now)
static void read_conn(ares_conn_t *conn, const ares_timeval_t *now)
{
size_t read_len;
ares_conn_err_t err = ARES_CONN_ERR_SUCCESS;
unsigned char buf[MAXENDSSZ + 1];

/* To reduce event loop overhead, read and process as many
* packets as we can. */
do {
if (conn->fd == ARES_SOCKET_BAD) {
err = ARES_CONN_ERR_FAILURE;
} else {
err = ares__conn_read(conn, (void *)buf, sizeof(buf), &read_len);
}

if (err == ARES_CONN_ERR_SUCCESS) {
process_answer(channel, buf, (size_t)read_len, conn, ARES_FALSE, now);
} else if (err != ARES_CONN_ERR_WOULDBLOCK) {
handle_conn_error(conn, ARES_TRUE, ARES_ECONNREFUSED);
return;
}

/* Try to read again only if *we* set up the socket, otherwise it may be
* a blocking socket and would cause recvfrom to hang. */
} while (err == ARES_CONN_ERR_SUCCESS && channel->sock_funcs == NULL);
/* TODO: There might be a potential issue here where there was a read that
* read some data, then looped and read again and got a disconnect.
* Right now, that would cause a resend instead of processing the data
* we have. This is fairly unlikely to occur due to only looping if
* a full buffer of 65535 bytes was read. */
if (read_conn_packets(conn) != ARES_SUCCESS) {
return;
}
read_answers(conn, now);
}

static void read_packets(ares_channel_t *channel, fd_set *read_fds,
static void process_read(ares_channel_t *channel, fd_set *read_fds,
ares_socket_t read_fd, const ares_timeval_t *now)
{
size_t i;
ares_socket_t *socketlist = NULL;
size_t num_sockets = 0;
ares_conn_t *conn = NULL;
ares__llist_node_t *node = NULL;

if (!read_fds && (read_fd == ARES_SOCKET_BAD)) {
Expand All @@ -512,13 +536,7 @@ static void read_packets(ares_channel_t *channel, fd_set *read_fds,
return;
}

conn = ares__llist_node_val(node);

if (conn->flags & ARES_CONN_FLAG_TCP) {
read_tcp_data(channel, conn, now);
} else {
read_udp_packets_fd(channel, conn, now);
}
read_conn(ares__llist_node_val(node), now);

return;
}
Expand Down Expand Up @@ -546,13 +564,7 @@ static void read_packets(ares_channel_t *channel, fd_set *read_fds,
return;
}

conn = ares__llist_node_val(node);

if (conn->flags & ARES_CONN_FLAG_TCP) {
read_tcp_data(channel, conn, now);
} else {
read_udp_packets_fd(channel, conn, now);
}
read_conn(ares__llist_node_val(node), now);
}

ares_free(socketlist);
Expand Down Expand Up @@ -616,7 +628,7 @@ static ares_status_t rewrite_without_edns(ares_query_t *query)
* the connection to be terminated after this call. */
static ares_status_t process_answer(ares_channel_t *channel,
const unsigned char *abuf, size_t alen,
ares_conn_t *conn, ares_bool_t tcp,
ares_conn_t *conn,
const ares_timeval_t *now)
{
ares_query_t *query;
Expand Down Expand Up @@ -680,6 +692,7 @@ static ares_status_t process_answer(ares_channel_t *channel,
if (ares_dns_record_get_rcode(rdnsrec) == ARES_RCODE_FORMERR &&
ares_dns_get_opt_rr_const(query->query) != NULL &&
ares_dns_get_opt_rr_const(rdnsrec) == NULL) {

status = rewrite_without_edns(query);
if (status != ARES_SUCCESS) {
end_query(channel, server, query, status, NULL);
Expand All @@ -695,7 +708,8 @@ static ares_status_t process_answer(ares_channel_t *channel,
* don't accept the packet, and switch the query to TCP if we hadn't
* done so already.
*/
if (ares_dns_record_get_flags(rdnsrec) & ARES_FLAG_TC && !tcp &&
if (ares_dns_record_get_flags(rdnsrec) & ARES_FLAG_TC &&
!(conn->flags & ARES_CONN_FLAG_TCP) &&
!(channel->flags & ARES_FLAG_IGNTC)) {
query->using_tcp = ARES_TRUE;
ares__send_query(query, now);
Expand Down

0 comments on commit 9277b59

Please sign in to comment.