Skip to content

Commit

Permalink
feat(ws): handle websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
jkuri committed Mar 15, 2018
1 parent 239da9a commit 462c9f3
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 26 deletions.
2 changes: 1 addition & 1 deletion include/bproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static void write_buf(uv_stream_t *handle, char *data, int len);
void proxy_close_cb(uv_handle_t *peer);
void proxy_read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf);
void proxy_connect_cb(uv_connect_t *req, int status);
void proxy_connection(char *ip, unsigned short port, conn_t *conn);
void proxy_http_request(char *ip, unsigned short port, conn_t *conn);

static void write_cb(uv_write_t *req, int status);
static void close_cb(uv_handle_t *peer);
Expand Down
2 changes: 2 additions & 0 deletions include/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ typedef struct conn_t
http_parser *parser;
http_request_t *request;
uv_stream_t *handle;
boolean ws_handshake_sent;
uv_stream_t *proxy_handle;
} conn_t;

int message_begin_cb(http_parser *p);
Expand Down
81 changes: 56 additions & 25 deletions src/bproxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ void conn_init(uv_stream_t *handle)
conn_t *conn = malloc(sizeof(conn_t));
conn->parser = malloc(sizeof(http_parser));
conn->request = malloc(sizeof(http_request_t));
conn->proxy_handle = malloc(sizeof(conn_t));
conn->proxy_handle = NULL;
conn->handle = handle;
handle->data = conn;
http_parser_init(conn->parser, HTTP_REQUEST);
Expand Down Expand Up @@ -46,7 +48,7 @@ void write_buf(uv_stream_t *handle, char *data, int len)
write_req_t *wr;
wr = (write_req_t *)malloc(sizeof *wr);
wr->buf = uv_buf_init((char *)data, len);
if (uv_write(&wr->req, handle, &wr->buf, 1, write_cb))
if (uv_is_writable((const uv_stream_t *)handle) && !uv_is_closing((const uv_handle_t *)handle) && uv_write(&wr->req, handle, &wr->buf, 1, write_cb))
{
fprintf(stderr, "write error: could not write to destination!\n");
return;
Expand Down Expand Up @@ -106,7 +108,10 @@ void proxy_read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
fprintf(stderr, "error reading from socket!\n");
}
uv_close((uv_handle_t *)handle, proxy_close_cb);
if (uv_is_closing((const uv_handle_t *)handle))
{
uv_close((uv_handle_t *)handle, proxy_close_cb);
}
}

free(buf->base);
Expand All @@ -117,11 +122,15 @@ void proxy_connect_cb(uv_connect_t *req, int status)
proxy_t *proxy_conn = req->handle->data;
proxy_conn->connect_req = *req;

if (proxy_conn->conn->ws_handshake_sent) {
proxy_conn->conn->proxy_handle = req->handle;
}

uv_read_start(req->handle, alloc_cb, proxy_read_cb);
write_buf(req->handle, proxy_conn->conn->request->raw, strlen(proxy_conn->conn->request->raw));
}

void proxy_connection(char *ip, unsigned short port, conn_t *conn)
void proxy_http_request(char *ip, unsigned short port, conn_t *conn)
{
struct sockaddr_in dest;
uv_ip4_addr(ip, port, &dest);
Expand All @@ -130,8 +139,13 @@ void proxy_connection(char *ip, unsigned short port, conn_t *conn)
proxy_conn->conn = conn;
proxy_conn->tcp.data = proxy_conn;

if (conn->request->upgrade)
{
conn->ws_handshake_sent = true;
}

uv_tcp_init(server->loop, &proxy_conn->tcp);
// uv_tcp_keepalive(&proxy_conn->tcp, 1, 60);
uv_tcp_keepalive(&proxy_conn->tcp, 1, 60);
uv_tcp_connect(&proxy_conn->connect_req, &proxy_conn->tcp, (const struct sockaddr *)&dest, proxy_connect_cb);
}

Expand All @@ -141,39 +155,56 @@ void read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)

if (nread >= 0)
{
conn->request->raw = malloc(nread + 1);
memcpy(conn->request->raw, buf->base, nread);
conn->request->raw[nread] = '\0';

size_t np = http_parser_execute(conn->parser, &parser_settings, buf->base, nread);
if (np != nread)
if (!conn->ws_handshake_sent)
{
uv_shutdown_t *req;
req = (uv_shutdown_t *)malloc(sizeof *req);
uv_shutdown(req, handle, shutdown_cb);
}
conn->request->raw = malloc(nread + 1);
memcpy(conn->request->raw, buf->base, nread);
conn->request->raw[nread] = '\0';

fprintf(stderr, "[http]: %s\n", conn->request->url);
size_t np = http_parser_execute(conn->parser, &parser_settings, buf->base, nread);
if (np != nread)
{
uv_shutdown_t *req;
req = (uv_shutdown_t *)malloc(sizeof *req);
uv_shutdown(req, handle, shutdown_cb);
}

proxy_ip_port ip_port = find_proxy_config(conn->request->hostname);
if (!ip_port.ip || !ip_port.port)
proxy_ip_port ip_port = find_proxy_config(conn->request->hostname);
if (!ip_port.ip || !ip_port.port)
{
char *resp = malloc(1024 * sizeof(char));
http_404_response(resp);
write_buf(conn->handle, resp, strlen(resp));
free(resp);
return;
}

if (!conn->parser->upgrade)
{
fprintf(stderr, "[http]: %s\n", conn->request->url);
}
else
{
fprintf(stderr, "[ws]: %s\n", conn->request->url);
}

proxy_http_request(ip_port.ip, ip_port.port, conn);
}
else if (conn->proxy_handle)
{
char *resp = malloc(1024 * sizeof(char));
http_404_response(resp);
write_buf(conn->handle, resp, strlen(resp));
free(resp);
return;
write_buf(conn->proxy_handle, buf->base, nread);
}

proxy_connection(ip_port.ip, ip_port.port, conn);
}
else
{
if (nread != UV_EOF)
{
fprintf(stderr, "error reading from socket!\n");
}
uv_close((uv_handle_t *)handle, close_cb);
if (uv_is_closing((const uv_handle_t *)handle))
{
uv_close((uv_handle_t *)handle, close_cb);
}
}

free(buf->base);
Expand Down

0 comments on commit 462c9f3

Please sign in to comment.