Proxy: support for connection upgrade (101 Switching Protocols).

This allows to proxy WebSockets by using configuration like this:

    location /chat/ {
        proxy_pass http://backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }

Connection upgrade is allowed as long as it was requested by a client
via the Upgrade request header.
This commit is contained in:
Maxim Dounin 2013-02-18 13:50:52 +00:00
parent ec6e4ad6ab
commit 5f77a13f4d
8 changed files with 323 additions and 3 deletions

View file

@ -62,6 +62,7 @@ ngx_http_chunked_header_filter(ngx_http_request_t *r)
if (r->headers_out.status == NGX_HTTP_NOT_MODIFIED
|| r->headers_out.status == NGX_HTTP_NO_CONTENT
|| r->headers_out.status < NGX_HTTP_OK
|| r != r->main
|| (r->method & NGX_HTTP_HEAD))
{

View file

@ -1474,6 +1474,14 @@ ngx_http_proxy_process_header(ngx_http_request_t *r)
u->keepalive = !u->headers_in.connection_close;
}
if (u->headers_in.status_n == NGX_HTTP_SWITCHING_PROTOCOLS) {
u->keepalive = 0;
if (r->headers_in.upgrade) {
u->upgrade = 1;
}
}
return NGX_OK;
}

View file

@ -379,7 +379,10 @@ ngx_http_header_filter(ngx_http_request_t *r)
len += sizeof("Transfer-Encoding: chunked" CRLF) - 1;
}
if (r->keepalive) {
if (r->headers_out.status == NGX_HTTP_SWITCHING_PROTOCOLS) {
len += sizeof("Connection: upgrade" CRLF) - 1;
} else if (r->keepalive) {
len += sizeof("Connection: keep-alive" CRLF) - 1;
/*
@ -548,7 +551,11 @@ ngx_http_header_filter(ngx_http_request_t *r)
sizeof("Transfer-Encoding: chunked" CRLF) - 1);
}
if (r->keepalive) {
if (r->headers_out.status == NGX_HTTP_SWITCHING_PROTOCOLS) {
b->last = ngx_cpymem(b->last, "Connection: upgrade" CRLF,
sizeof("Connection: upgrade" CRLF) - 1);
} else if (r->keepalive) {
b->last = ngx_cpymem(b->last, "Connection: keep-alive" CRLF,
sizeof("Connection: keep-alive" CRLF) - 1);

View file

@ -130,6 +130,10 @@ ngx_http_header_t ngx_http_headers_in[] = {
offsetof(ngx_http_headers_in_t, expect),
ngx_http_process_unique_header_line },
{ ngx_string("Upgrade"),
offsetof(ngx_http_headers_in_t, upgrade),
ngx_http_process_header_line },
#if (NGX_HTTP_GZIP)
{ ngx_string("Accept-Encoding"),
offsetof(ngx_http_headers_in_t, accept_encoding),

View file

@ -64,6 +64,10 @@
#define NGX_HTTP_LOG_UNSAFE 8
#define NGX_HTTP_CONTINUE 100
#define NGX_HTTP_SWITCHING_PROTOCOLS 101
#define NGX_HTTP_PROCESSING 102
#define NGX_HTTP_OK 200
#define NGX_HTTP_CREATED 201
#define NGX_HTTP_ACCEPTED 202
@ -184,6 +188,7 @@ typedef struct {
ngx_table_elt_t *transfer_encoding;
ngx_table_elt_t *expect;
ngx_table_elt_t *upgrade;
#if (NGX_HTTP_GZIP)
ngx_table_elt_t *accept_encoding;

View file

@ -46,6 +46,16 @@ static void ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
ngx_http_upstream_t *u);
static void ngx_http_upstream_send_response(ngx_http_request_t *r,
ngx_http_upstream_t *u);
static void ngx_http_upstream_upgrade(ngx_http_request_t *r,
ngx_http_upstream_t *u);
static void ngx_http_upstream_upgraded_read_downstream(ngx_http_request_t *r);
static void ngx_http_upstream_upgraded_write_downstream(ngx_http_request_t *r);
static void ngx_http_upstream_upgraded_read_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u);
static void ngx_http_upstream_upgraded_write_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u);
static void ngx_http_upstream_process_upgraded(ngx_http_request_t *r,
ngx_uint_t from_upstream);
static void
ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r);
static void
@ -1327,6 +1337,7 @@ ngx_http_upstream_reinit(ngx_http_request_t *r, ngx_http_upstream_t *u)
}
u->keepalive = 0;
u->upgrade = 0;
ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
u->headers_in.content_length_n = -1;
@ -2078,6 +2089,11 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
return;
}
if (u->upgrade) {
ngx_http_upstream_upgrade(r, u);
return;
}
c = r->connection;
if (r->header_only) {
@ -2360,6 +2376,278 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
}
static void
ngx_http_upstream_upgrade(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
int tcp_nodelay;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
c = r->connection;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/* TODO: prevent upgrade if not requested or not possible */
r->keepalive = 0;
c->log->action = "proxying upgraded connection";
u->read_event_handler = ngx_http_upstream_upgraded_read_upstream;
u->write_event_handler = ngx_http_upstream_upgraded_write_upstream;
r->read_event_handler = ngx_http_upstream_upgraded_read_downstream;
r->write_event_handler = ngx_http_upstream_upgraded_write_downstream;
if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
tcp_nodelay = 1;
if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
(const void *) &tcp_nodelay, sizeof(int)) == -1)
{
ngx_connection_error(c, ngx_socket_errno,
"setsockopt(TCP_NODELAY) failed");
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
c->tcp_nodelay = NGX_TCP_NODELAY_SET;
if (setsockopt(u->peer.connection->fd, IPPROTO_TCP, TCP_NODELAY,
(const void *) &tcp_nodelay, sizeof(int)) == -1)
{
ngx_connection_error(u->peer.connection, ngx_socket_errno,
"setsockopt(TCP_NODELAY) failed");
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
u->peer.connection->tcp_nodelay = NGX_TCP_NODELAY_SET;
}
if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (u->peer.connection->read->ready
|| u->buffer.pos != u->buffer.last)
{
ngx_http_upstream_process_upgraded(r, 1);
}
if (c->read->ready
|| r->header_in->pos != r->header_in->last)
{
ngx_http_upstream_process_upgraded(r, 0);
}
}
static void
ngx_http_upstream_upgraded_read_downstream(ngx_http_request_t *r)
{
ngx_http_upstream_process_upgraded(r, 0);
}
static void
ngx_http_upstream_upgraded_write_downstream(ngx_http_request_t *r)
{
ngx_http_upstream_process_upgraded(r, 1);
}
static void
ngx_http_upstream_upgraded_read_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_http_upstream_process_upgraded(r, 1);
}
static void
ngx_http_upstream_upgraded_write_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_http_upstream_process_upgraded(r, 0);
}
static void
ngx_http_upstream_process_upgraded(ngx_http_request_t *r,
ngx_uint_t from_upstream)
{
size_t size;
ssize_t n;
ngx_buf_t *b;
ngx_uint_t do_write;
ngx_connection_t *c, *downstream, *upstream, *dst, *src;
ngx_http_upstream_t *u;
ngx_http_core_loc_conf_t *clcf;
c = r->connection;
u = r->upstream;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process upgraded, fu:%ui", from_upstream);
downstream = c;
upstream = u->peer.connection;
if (downstream->write->timedout) {
c->timedout = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
return;
}
if (upstream->read->timedout || upstream->write->timedout) {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (from_upstream) {
src = upstream;
dst = downstream;
b = &u->buffer;
} else {
src = downstream;
dst = upstream;
b = &u->from_client;
if (r->header_in->last > r->header_in->pos) {
b = r->header_in;
b->end = b->last;
do_write = 1;
}
if (b->start == NULL) {
b->start = ngx_palloc(r->pool, u->conf->buffer_size);
if (b->start == NULL) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
b->pos = b->start;
b->last = b->start;
b->end = b->start + u->conf->buffer_size;
b->temporary = 1;
b->tag = u->output.tag;
}
}
for ( ;; ) {
if (do_write) {
size = b->last - b->pos;
if (size && dst->write->ready) {
n = dst->send(dst, b->pos, size);
if (n == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (n > 0) {
b->pos += n;
if (b->pos == b->last) {
b->pos = b->start;
b->last = b->start;
}
}
}
}
size = b->end - b->last;
if (size && src->read->ready) {
n = src->recv(src, b->last, size);
if (n == NGX_AGAIN || n == 0) {
break;
}
if (n > 0) {
do_write = 1;
b->last += n;
continue;
}
if (n == NGX_ERROR) {
src->read->eof = 1;
}
}
break;
}
if ((upstream->read->eof && u->buffer.pos == u->buffer.last)
|| (downstream->read->eof && u->from_client.pos == u->from_client.last)
|| (downstream->read->eof && upstream->read->eof))
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream upgraded done");
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
if (ngx_handle_write_event(upstream->write, u->conf->send_lowat)
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (upstream->write->active && !upstream->write->ready) {
ngx_add_timer(upstream->write, u->conf->send_timeout);
} else if (upstream->write->timer_set) {
ngx_del_timer(upstream->write);
}
if (ngx_handle_read_event(upstream->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (upstream->read->active && !upstream->read->ready) {
ngx_add_timer(upstream->read, u->conf->read_timeout);
} else if (upstream->read->timer_set) {
ngx_del_timer(upstream->read);
}
if (ngx_handle_write_event(downstream->write, clcf->send_lowat)
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (ngx_handle_read_event(downstream->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (downstream->write->active && !downstream->write->ready) {
ngx_add_timer(downstream->write, clcf->send_timeout);
} else if (downstream->write->timer_set) {
ngx_del_timer(downstream->write);
}
}
static void
ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r)
{

View file

@ -284,6 +284,8 @@ struct ngx_http_upstream_s {
ngx_http_upstream_resolved_t *resolved;
ngx_buf_t from_client;
ngx_buf_t buffer;
off_t length;
@ -329,6 +331,7 @@ struct ngx_http_upstream_s {
unsigned buffering:1;
unsigned keepalive:1;
unsigned upgrade:1;
unsigned request_sent:1;
unsigned header_sent:1;

View file

@ -1747,7 +1747,11 @@ ngx_http_variable_sent_connection(ngx_http_request_t *r,
size_t len;
char *p;
if (r->keepalive) {
if (r->headers_out.status == NGX_HTTP_SWITCHING_PROTOCOLS) {
len = sizeof("upgrade") - 1;
p = "upgrade";
} else if (r->keepalive) {
len = sizeof("keep-alive") - 1;
p = "keep-alive";