Create new stream immediately on receiving new stream id.
Before the patch, full STREAM frame handling was delayed until the frame with zero offset is received. Only node in the streams tree was created. This lead to problems when such stream was deleted, in particular, it had no handlers set for read events. This patch creates new stream immediately, but delays data delivery until the proper offset will arrive. This is somewhat similar to how accept() operation works. The ngx_quic_add_stream() function is no longer needed and merged into stream handler. The ngx_quic_stream_input() now only handles frames for existing streams and does not deal with stream creation.
This commit is contained in:
parent
a99a268a5d
commit
fb07bd3fc1
1 changed files with 76 additions and 79 deletions
|
@ -172,8 +172,6 @@ static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
|
|||
ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
|
||||
static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
|
||||
ngx_quic_frame_t *frame);
|
||||
static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c,
|
||||
ngx_quic_stream_frame_t *f);
|
||||
|
||||
static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
|
||||
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
|
||||
|
@ -1742,6 +1740,9 @@ static ngx_int_t
|
|||
ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
|
||||
ngx_quic_frame_t *frame)
|
||||
{
|
||||
size_t n;
|
||||
ngx_buf_t *b;
|
||||
ngx_event_t *rev;
|
||||
ngx_quic_stream_t *sn;
|
||||
ngx_quic_connection_t *qc;
|
||||
ngx_quic_stream_frame_t *f;
|
||||
|
@ -1753,10 +1754,66 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
|
|||
sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
|
||||
|
||||
if (sn == NULL) {
|
||||
sn = ngx_quic_add_stream(c, f);
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
|
||||
|
||||
n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
|
||||
? qc->tp.initial_max_stream_data_uni
|
||||
: qc->tp.initial_max_stream_data_bidi_remote;
|
||||
|
||||
if (n < NGX_QUIC_STREAM_BUFSIZE) {
|
||||
n = NGX_QUIC_STREAM_BUFSIZE;
|
||||
}
|
||||
|
||||
if (n < f->length) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: check IDs are increasing ? create all lower-numbered?
|
||||
*
|
||||
* 2.1. Stream Types and Identifiers
|
||||
*
|
||||
* Within each type, streams are created with numerically increasing
|
||||
* stream IDs. A stream ID that is used out of order results in all
|
||||
* streams of that type with lower-numbered stream IDs also being
|
||||
* opened.
|
||||
*/
|
||||
sn = ngx_quic_create_stream(c, f->stream_id, n);
|
||||
if (sn == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
rev = sn->c->read;
|
||||
|
||||
if (f->offset == 0) {
|
||||
|
||||
b = sn->b;
|
||||
b->last = ngx_cpymem(b->last, f->data, f->length);
|
||||
|
||||
sn->fs.received += f->length;
|
||||
|
||||
rev->ready = 1;
|
||||
|
||||
if (f->fin) {
|
||||
rev->pending_eof = 1;
|
||||
}
|
||||
|
||||
} else {
|
||||
rev->ready = 0;
|
||||
}
|
||||
|
||||
if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
|
||||
ngx_quic_handle_max_streams(c);
|
||||
}
|
||||
|
||||
qc->streams.handler(sn->c);
|
||||
|
||||
if (f->offset == 0) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
/* out-of-order stream: proceed to buffering */
|
||||
}
|
||||
|
||||
fs = &sn->fs;
|
||||
|
@ -1779,49 +1836,26 @@ ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
|
|||
f = &frame->u.stream;
|
||||
|
||||
sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
|
||||
|
||||
if (sn == NULL) {
|
||||
// TODO: possible?
|
||||
// deleted while stream is in reordering queue?
|
||||
// stream was deleted while in reordering queue ?
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (sn->fs.received != 0) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
|
||||
b = sn->b;
|
||||
|
||||
if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if ((size_t) (b->end - b->last) < f->length) {
|
||||
b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
|
||||
b->pos = b->start;
|
||||
}
|
||||
|
||||
b->last = ngx_cpymem(b->last, f->data, f->length);
|
||||
|
||||
rev = sn->c->read;
|
||||
rev->ready = 1;
|
||||
|
||||
if (f->fin) {
|
||||
rev->pending_eof = 1;
|
||||
}
|
||||
|
||||
if (rev->active) {
|
||||
rev->handler(rev);
|
||||
}
|
||||
|
||||
/* check if stream was destroyed */
|
||||
if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
|
||||
return NGX_DONE;
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
|
||||
|
||||
b = sn->b;
|
||||
|
||||
if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if ((size_t) (b->end - b->last) < f->length) {
|
||||
b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
|
||||
b->pos = b->start;
|
||||
}
|
||||
|
||||
b->last = ngx_cpymem(b->last, f->data, f->length);
|
||||
|
||||
rev = sn->c->read;
|
||||
|
@ -1831,13 +1865,11 @@ ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
|
|||
rev->pending_eof = 1;
|
||||
}
|
||||
|
||||
if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
|
||||
ngx_quic_handle_max_streams(c);
|
||||
if (rev->active) {
|
||||
rev->handler(rev);
|
||||
}
|
||||
|
||||
qc->streams.handler(sn->c);
|
||||
|
||||
/* check if stream was destroyed */
|
||||
/* check if stream was destroyed by handler */
|
||||
if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
|
||||
return NGX_DONE;
|
||||
}
|
||||
|
@ -1846,41 +1878,6 @@ ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
|
|||
}
|
||||
|
||||
|
||||
static ngx_quic_stream_t *
|
||||
ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f)
|
||||
{
|
||||
size_t n;
|
||||
ngx_quic_stream_t *sn;
|
||||
ngx_quic_connection_t *qc;
|
||||
|
||||
qc = c->quic;
|
||||
|
||||
// TODO: check increasing IDs
|
||||
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
|
||||
|
||||
n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
|
||||
? qc->tp.initial_max_stream_data_uni
|
||||
: qc->tp.initial_max_stream_data_bidi_remote;
|
||||
|
||||
if (n < NGX_QUIC_STREAM_BUFSIZE) {
|
||||
n = NGX_QUIC_STREAM_BUFSIZE;
|
||||
}
|
||||
|
||||
if (n < f->length) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
sn = ngx_quic_create_stream(c, f->stream_id, n);
|
||||
if (sn == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return sn;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_quic_handle_max_streams(ngx_connection_t *c)
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue