Added reordering support for STREAM frames.

Each stream node now includes incoming frames queue and sent/received counters
for tracking offset. The sent counter is not used, c->sent is used, not like
in crypto buffers, which have no connections.
This commit is contained in:
Vladimir Homutov 2020-04-15 11:11:54 +03:00
parent 30f51174ec
commit 0a59aa67e4
2 changed files with 159 additions and 72 deletions

View file

@ -70,15 +70,6 @@ typedef struct {
} ngx_quic_send_ctx_t;
/* ordered frames stream context */
typedef struct {
uint64_t sent;
uint64_t received;
ngx_queue_t frames;
size_t total; /* size of buffered data */
} ngx_quic_frames_stream_t;
struct ngx_quic_connection_s {
ngx_str_t scid;
ngx_str_t dcid;
@ -177,7 +168,12 @@ static ngx_int_t ngx_quic_handle_ordered_frame(ngx_connection_t *c,
static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c,
ngx_quic_frame_t *frame);
static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame);
ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
ngx_quic_frame_t *frame);
static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c,
ngx_quic_stream_frame_t *f);
static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
@ -739,6 +735,7 @@ ngx_quic_close_connection(ngx_connection_t *c)
#if (NGX_DEBUG)
ngx_uint_t ns;
#endif
ngx_uint_t i;
ngx_pool_t *pool;
ngx_event_t *rev;
ngx_rbtree_t *tree;
@ -748,11 +745,14 @@ ngx_quic_close_connection(ngx_connection_t *c)
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection");
// TODO: free frames from reorder queue if any
qc = c->quic;
if (qc) {
for (i = 0; i < NGX_QUIC_ENCRYPTION_LAST; i++) {
ngx_quic_free_frames(c, &qc->crypto[i].frames);
}
qc->closing = 1;
tree = &qc->streams.tree;
@ -1201,9 +1201,7 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
case NGX_QUIC_FT_STREAM6:
case NGX_QUIC_FT_STREAM7:
if (ngx_quic_handle_stream_frame(c, pkt, &frame.u.stream)
!= NGX_OK)
{
if (ngx_quic_handle_stream_frame(c, pkt, &frame) != NGX_OK) {
return NGX_ERROR;
}
@ -1441,6 +1439,7 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler)
{
size_t full_len;
ngx_int_t rc;
ngx_queue_t *q;
ngx_quic_ordered_frame_t *f;
@ -1468,10 +1467,17 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
/* f->offset == fs->received */
if (handler(c, frame) != NGX_OK) {
rc = handler(c, frame);
if (rc == NGX_ERROR) {
return NGX_ERROR;
} else if (rc == NGX_DONE) {
/* handler destroyed stream, queue no longer exists */
return NGX_OK;
}
/* rc == NGX_OK */
fs->received += f->length;
/* now check the queue if we can continue with buffered frames */
@ -1512,8 +1518,14 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
/* f->offset == fs->received */
if (handler(c, frame) != NGX_OK) {
rc = handler(c, frame);
if (rc == NGX_ERROR) {
return NGX_ERROR;
} else if (rc == NGX_DONE) {
/* handler destroyed stream, queue no longer exists */
return NGX_OK;
}
fs->received += f->length;
@ -1721,20 +1733,54 @@ ngx_quic_crypto_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
static ngx_int_t
ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
ngx_quic_frame_t *frame)
{
size_t n;
ngx_buf_t *b;
ngx_event_t *rev;
ngx_quic_stream_t *sn;
ngx_quic_connection_t *qc;
ngx_quic_stream_t *sn;
ngx_quic_connection_t *qc;
ngx_quic_stream_frame_t *f;
ngx_quic_frames_stream_t *fs;
qc = c->quic;
f = &frame->u.stream;
sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
if (sn) {
if (sn == NULL) {
sn = ngx_quic_add_stream(c, f);
if (sn == NULL) {
return NGX_ERROR;
}
}
fs = &sn->fs;
return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input);
}
static ngx_int_t
ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
{
ngx_buf_t *b;
ngx_event_t *rev;
ngx_quic_stream_t *sn;
ngx_quic_connection_t *qc;
ngx_quic_stream_frame_t *f;
qc = c->quic;
f = &frame->u.stream;
sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
if (sn == NULL) {
// TODO: possible?
// deleted while stream is in reordering queue?
return NGX_ERROR;
}
if (sn->fs.received != 0) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
b = sn->b;
@ -1761,29 +1807,14 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
rev->handler(rev);
}
/* check if stream was destroyed */
if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
return NGX_DONE;
}
return NGX_OK;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
? qc->tp.initial_max_stream_data_uni
: qc->tp.initial_max_stream_data_bidi_remote;
if (n < NGX_QUIC_STREAM_BUFSIZE) {
n = NGX_QUIC_STREAM_BUFSIZE;
}
if (n < f->length) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
return NGX_ERROR;
}
sn = ngx_quic_create_stream(c, f->stream_id, n);
if (sn == NULL) {
return NGX_ERROR;
}
b = sn->b;
b->last = ngx_cpymem(b->last, f->data, f->length);
@ -1800,10 +1831,50 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
qc->streams.handler(sn->c);
/* check if stream was destroyed */
if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
return NGX_DONE;
}
return NGX_OK;
}
static ngx_quic_stream_t *
ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f)
{
size_t n;
ngx_quic_stream_t *sn;
ngx_quic_connection_t *qc;
qc = c->quic;
// TODO: check increasing IDs
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
? qc->tp.initial_max_stream_data_uni
: qc->tp.initial_max_stream_data_bidi_remote;
if (n < NGX_QUIC_STREAM_BUFSIZE) {
n = NGX_QUIC_STREAM_BUFSIZE;
}
if (n < f->length) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
return NULL;
}
sn = ngx_quic_create_stream(c, f->stream_id, n);
if (sn == NULL) {
return NULL;
}
return sn;
}
static ngx_int_t
ngx_quic_handle_max_streams(ngx_connection_t *c)
{
@ -2024,7 +2095,6 @@ ngx_quic_output_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
return NGX_ERROR;
}
} while (q != ngx_queue_sentinel(&ctx->frames));
return NGX_OK;
@ -2037,15 +2107,19 @@ ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames)
ngx_queue_t *q;
ngx_quic_frame_t *f;
q = ngx_queue_head(frames);
do {
q = ngx_queue_head(frames);
if (q == ngx_queue_sentinel(frames)) {
break;
}
ngx_queue_remove(q);
f = ngx_queue_data(q, ngx_quic_frame_t, queue);
q = ngx_queue_next(q);
ngx_quic_free_frame(c, f);
} while (q != ngx_queue_sentinel(frames));
} while (1);
}
@ -2237,7 +2311,7 @@ ngx_quic_retransmit_handler(ngx_event_t *ev)
static void
ngx_quic_push_handler(ngx_event_t *ev)
{
ngx_connection_t *c;
ngx_connection_t *c;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer");
@ -2430,6 +2504,8 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
return NULL;
}
ngx_queue_init(&sn->fs.frames);
log = ngx_palloc(pool, sizeof(ngx_log_t));
if (log == NULL) {
ngx_destroy_pool(pool);
@ -2595,6 +2671,8 @@ ngx_quic_stream_cleanup_handler(void *data)
return;
}
ngx_quic_free_frames(pc, &qs->fs.frames);
if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) {
/* do not send fin for client unidirectional streams */
return;

View file

@ -29,33 +29,42 @@
typedef struct {
/* configurable */
ngx_msec_t max_idle_timeout;
ngx_msec_t max_ack_delay;
ngx_msec_t max_idle_timeout;
ngx_msec_t max_ack_delay;
ngx_uint_t max_packet_size;
ngx_uint_t initial_max_data;
ngx_uint_t initial_max_stream_data_bidi_local;
ngx_uint_t initial_max_stream_data_bidi_remote;
ngx_uint_t initial_max_stream_data_uni;
ngx_uint_t initial_max_streams_bidi;
ngx_uint_t initial_max_streams_uni;
ngx_uint_t ack_delay_exponent;
ngx_uint_t disable_active_migration;
ngx_uint_t active_connection_id_limit;
ngx_uint_t max_packet_size;
ngx_uint_t initial_max_data;
ngx_uint_t initial_max_stream_data_bidi_local;
ngx_uint_t initial_max_stream_data_bidi_remote;
ngx_uint_t initial_max_stream_data_uni;
ngx_uint_t initial_max_streams_bidi;
ngx_uint_t initial_max_streams_uni;
ngx_uint_t ack_delay_exponent;
ngx_uint_t disable_active_migration;
ngx_uint_t active_connection_id_limit;
/* TODO */
ngx_uint_t original_connection_id;
u_char stateless_reset_token[16];
void *preferred_address;
ngx_uint_t original_connection_id;
u_char stateless_reset_token[16];
void *preferred_address;
} ngx_quic_tp_t;
typedef struct {
uint64_t sent;
uint64_t received;
ngx_queue_t frames; /* reorder queue */
size_t total; /* size of buffered data */
} ngx_quic_frames_stream_t;
struct ngx_quic_stream_s {
ngx_rbtree_node_t node;
ngx_connection_t *parent;
ngx_connection_t *c;
uint64_t id;
ngx_buf_t *b;
ngx_rbtree_node_t node;
ngx_connection_t *parent;
ngx_connection_t *c;
uint64_t id;
ngx_buf_t *b;
ngx_quic_frames_stream_t fs;
};