Respect MAX_DATA and MAX_STREAM_DATA from QUIC client.
This commit is contained in:
parent
fa1795919c
commit
b93e22b5fd
2 changed files with 156 additions and 18 deletions
|
@ -48,8 +48,10 @@ typedef struct {
|
|||
|
||||
ngx_uint_t id_counter;
|
||||
|
||||
uint64_t total_received;
|
||||
uint64_t max_data;
|
||||
uint64_t received;
|
||||
uint64_t sent;
|
||||
uint64_t recv_max_data;
|
||||
uint64_t send_max_data;
|
||||
} ngx_quic_streams_t;
|
||||
|
||||
|
||||
|
@ -112,7 +114,6 @@ struct ngx_quic_connection_s {
|
|||
|
||||
ngx_quic_streams_t streams;
|
||||
ngx_quic_congestion_t congestion;
|
||||
ngx_uint_t max_data;
|
||||
|
||||
uint64_t cur_streams;
|
||||
uint64_t max_streams;
|
||||
|
@ -201,10 +202,14 @@ static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
|
|||
ngx_quic_frame_t *frame);
|
||||
|
||||
static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
|
||||
static ngx_int_t ngx_quic_handle_max_data_frame(ngx_connection_t *c,
|
||||
ngx_quic_max_data_frame_t *f);
|
||||
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
|
||||
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
|
||||
static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
|
||||
ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
|
||||
static ngx_int_t ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
|
||||
ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f);
|
||||
|
||||
static void ngx_quic_queue_frame(ngx_quic_connection_t *qc,
|
||||
ngx_quic_frame_t *frame);
|
||||
|
@ -599,7 +604,7 @@ ngx_quic_new_connection(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_quic_tp_t *tp,
|
|||
ctp->ack_delay_exponent = NGX_QUIC_DEFAULT_ACK_DELAY_EXPONENT;
|
||||
ctp->max_ack_delay = NGX_QUIC_DEFAULT_MAX_ACK_DELAY;
|
||||
|
||||
qc->streams.max_data = qc->tp.initial_max_data;
|
||||
qc->streams.recv_max_data = qc->tp.initial_max_data;
|
||||
|
||||
qc->congestion.window = ngx_min(10 * qc->tp.max_packet_size,
|
||||
ngx_max(2 * qc->tp.max_packet_size, 14720));
|
||||
|
@ -1416,7 +1421,12 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
|
|||
break;
|
||||
|
||||
case NGX_QUIC_FT_MAX_DATA:
|
||||
c->quic->max_data = frame.u.max_data.max_data;
|
||||
|
||||
if (ngx_quic_handle_max_data_frame(c, &frame.u.max_data) != NGX_OK)
|
||||
{
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ack_this = 1;
|
||||
break;
|
||||
|
||||
|
@ -1445,6 +1455,18 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
|
|||
ack_this = 1;
|
||||
break;
|
||||
|
||||
case NGX_QUIC_FT_MAX_STREAM_DATA:
|
||||
|
||||
if (ngx_quic_handle_max_stream_data_frame(c, pkt,
|
||||
&frame.u.max_stream_data)
|
||||
!= NGX_OK)
|
||||
{
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ack_this = 1;
|
||||
break;
|
||||
|
||||
case NGX_QUIC_FT_NEW_CONNECTION_ID:
|
||||
case NGX_QUIC_FT_RETIRE_CONNECTION_ID:
|
||||
case NGX_QUIC_FT_NEW_TOKEN:
|
||||
|
@ -1452,7 +1474,6 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
|
|||
case NGX_QUIC_FT_STOP_SENDING:
|
||||
case NGX_QUIC_FT_PATH_CHALLENGE:
|
||||
case NGX_QUIC_FT_PATH_RESPONSE:
|
||||
case NGX_QUIC_FT_MAX_STREAM_DATA:
|
||||
|
||||
/* TODO: handle */
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
|
@ -2207,6 +2228,45 @@ ngx_quic_handle_max_streams(ngx_connection_t *c)
|
|||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_quic_handle_max_data_frame(ngx_connection_t *c,
|
||||
ngx_quic_max_data_frame_t *f)
|
||||
{
|
||||
ngx_event_t *wev;
|
||||
ngx_rbtree_t *tree;
|
||||
ngx_rbtree_node_t *node;
|
||||
ngx_quic_stream_t *qs;
|
||||
ngx_quic_connection_t *qc;
|
||||
|
||||
qc = c->quic;
|
||||
tree = &qc->streams.tree;
|
||||
|
||||
if (f->max_data <= qc->streams.send_max_data) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
if (qc->streams.sent >= qc->streams.send_max_data) {
|
||||
|
||||
for (node = ngx_rbtree_min(tree->root, tree->sentinel);
|
||||
node;
|
||||
node = ngx_rbtree_next(tree, node))
|
||||
{
|
||||
qs = (ngx_quic_stream_t *) node;
|
||||
wev = qs->c->write;
|
||||
|
||||
if (wev->active) {
|
||||
wev->ready = 1;
|
||||
ngx_post_event(wev, &ngx_posted_events);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
qc->streams.send_max_data = f->max_data;
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
|
||||
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f)
|
||||
|
@ -2279,6 +2339,44 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
|
|||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
|
||||
ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
|
||||
{
|
||||
uint64_t sent;
|
||||
ngx_event_t *wev;
|
||||
ngx_quic_stream_t *sn;
|
||||
ngx_quic_connection_t *qc;
|
||||
|
||||
qc = c->quic;
|
||||
sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
|
||||
|
||||
if (sn == NULL) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0, "unknown stream id:%uL", f->id);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (f->limit <= sn->send_max_data) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
sent = sn->c->sent;
|
||||
|
||||
if (sent >= sn->send_max_data) {
|
||||
wev = sn->c->write;
|
||||
|
||||
if (wev->active) {
|
||||
wev->ready = 1;
|
||||
ngx_post_event(wev, &ngx_posted_events);
|
||||
}
|
||||
}
|
||||
|
||||
sn->send_max_data = f->limit;
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame)
|
||||
{
|
||||
|
@ -2810,10 +2908,13 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtree, uint64_t id)
|
|||
static ngx_quic_stream_t *
|
||||
ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
|
||||
{
|
||||
ngx_log_t *log;
|
||||
ngx_pool_t *pool;
|
||||
ngx_quic_stream_t *sn;
|
||||
ngx_pool_cleanup_t *cln;
|
||||
ngx_log_t *log;
|
||||
ngx_pool_t *pool;
|
||||
ngx_quic_stream_t *sn;
|
||||
ngx_pool_cleanup_t *cln;
|
||||
ngx_quic_connection_t *qc;
|
||||
|
||||
qc = c->quic;
|
||||
|
||||
pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
|
||||
if (pool == NULL) {
|
||||
|
@ -2877,6 +2978,19 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
|
|||
sn->c->write->ready = 1;
|
||||
}
|
||||
|
||||
if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
|
||||
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
|
||||
sn->send_max_data = qc->ctp.initial_max_stream_data_uni;
|
||||
}
|
||||
|
||||
} else {
|
||||
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
|
||||
sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
|
||||
} else {
|
||||
sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
|
||||
}
|
||||
}
|
||||
|
||||
cln = ngx_pool_cleanup_add(pool, 0);
|
||||
if (cln == NULL) {
|
||||
ngx_close_connection(sn->c);
|
||||
|
@ -2932,7 +3046,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
|||
ngx_memcpy(buf, b->pos, len);
|
||||
|
||||
b->pos += len;
|
||||
qc->streams.total_received += len;
|
||||
qc->streams.received += len;
|
||||
|
||||
if (b->pos == b->last) {
|
||||
b->pos = b->start;
|
||||
|
@ -2963,7 +3077,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
|||
ngx_quic_queue_frame(pc->quic, frame);
|
||||
}
|
||||
|
||||
if ((qc->streams.max_data / 2) < qc->streams.total_received) {
|
||||
if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
|
||||
|
||||
frame = ngx_quic_alloc_frame(pc, 0);
|
||||
|
||||
|
@ -2971,11 +3085,11 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
|||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
qc->streams.max_data *= 2;
|
||||
qc->streams.recv_max_data *= 2;
|
||||
|
||||
frame->level = ssl_encryption_application;
|
||||
frame->type = NGX_QUIC_FT_MAX_DATA;
|
||||
frame->u.max_data.max_data = qc->streams.max_data;
|
||||
frame->u.max_data.max_data = qc->streams.recv_max_data;
|
||||
|
||||
ngx_sprintf(frame->info, "MAX_DATA max_data:%d level=%d on recv",
|
||||
(int) frame->u.max_data.max_data, frame->level);
|
||||
|
@ -2984,7 +3098,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
|
|||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic stream id 0x%xi recv: increased max data: %ui",
|
||||
qs->id, qc->streams.max_data);
|
||||
qs->id, qc->streams.recv_max_data);
|
||||
}
|
||||
|
||||
return len;
|
||||
|
@ -3024,6 +3138,10 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
|
|||
sent = c->sent;
|
||||
unacked = sent - qs->acked;
|
||||
|
||||
if (qc->streams.send_max_data == 0) {
|
||||
qc->streams.send_max_data = qc->ctp.initial_max_data;
|
||||
}
|
||||
|
||||
if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic send hit buffer size");
|
||||
|
@ -3033,6 +3151,24 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
|
|||
len = NGX_QUIC_STREAM_BUFSIZE - unacked;
|
||||
}
|
||||
|
||||
if (qc->streams.sent >= qc->streams.send_max_data) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic send hit MAX_DATA");
|
||||
len = 0;
|
||||
|
||||
} else if (qc->streams.sent + len > qc->streams.send_max_data) {
|
||||
len = qc->streams.send_max_data - qc->streams.sent;
|
||||
}
|
||||
|
||||
if (sent >= qs->send_max_data) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic send hit MAX_STREAM_DATA");
|
||||
len = 0;
|
||||
|
||||
} else if (sent + len > qs->send_max_data) {
|
||||
len = qs->send_max_data - sent;
|
||||
}
|
||||
|
||||
p = (u_char *) buf;
|
||||
end = (u_char *) buf + len;
|
||||
n = 0;
|
||||
|
@ -3061,6 +3197,7 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
|
|||
frame->u.stream.data = frame->data;
|
||||
|
||||
c->sent += fsize;
|
||||
qc->streams.sent += fsize;
|
||||
p += fsize;
|
||||
n += fsize;
|
||||
|
||||
|
@ -3070,9 +3207,9 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
|
|||
ngx_quic_queue_frame(qc, frame);
|
||||
}
|
||||
|
||||
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic stream send %uz sent:%O, unacked:%uL",
|
||||
n, c->sent, (uint64_t) c->sent - qs->acked);
|
||||
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic send %uz of %uz, sent:%O, unacked:%uL",
|
||||
n, size, c->sent, (uint64_t) c->sent - qs->acked);
|
||||
|
||||
if (n != size) {
|
||||
c->write->ready = 0;
|
||||
|
|
|
@ -71,6 +71,7 @@ struct ngx_quic_stream_s {
|
|||
ngx_connection_t *c;
|
||||
uint64_t id;
|
||||
uint64_t acked;
|
||||
uint64_t send_max_data;
|
||||
ngx_buf_t *b;
|
||||
ngx_quic_frames_stream_t fs;
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue