Skip to content

implement reliable reset extension #550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion include/quicly.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ typedef struct st_quicly_transport_parameters_t {
*
*/
uint8_t disable_active_migration : 1;
/**
*
*/
uint8_t reliable_stream_reset : 1;
/**
*
*/
Expand Down Expand Up @@ -528,7 +532,7 @@ struct st_quicly_conn_streamgroup_state_t {
uint64_t padding, ping, ack, reset_stream, stop_sending, crypto, new_token, stream, max_data, max_stream_data, \
max_streams_bidi, max_streams_uni, data_blocked, stream_data_blocked, streams_blocked, new_connection_id, \
retire_connection_id, path_challenge, path_response, transport_close, application_close, handshake_done, datagram, \
ack_frequency; \
ack_frequency, reset_stream_at; \
} num_frames_sent, num_frames_received; \
/** \
* Total number of PTOs observed during the connection. \
Expand Down Expand Up @@ -806,6 +810,10 @@ struct st_quicly_stream_t {
quicly_linklist_t control; /* links to conn_t::control (or to conn_t::streams_blocked if the blocked flag is set) */
quicly_linklist_t default_scheduler;
} pending_link;
/**
* if the stream is closed using reliable reset
*/
unsigned is_reliable_reset : 1;
} _send_aux;
/**
*
Expand Down Expand Up @@ -1212,6 +1220,10 @@ int quicly_get_or_open_stream(quicly_conn_t *conn, uint64_t stream_id, quicly_st
*
*/
void quicly_reset_stream(quicly_stream_t *stream, int err);
/**
*
*/
int quicly_reset_stream_reliable(quicly_stream_t *stream, uint64_t reliable_size, int err);
/**
*
*/
Expand Down
34 changes: 27 additions & 7 deletions include/quicly/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
extern "C" {
#endif

#include <assert.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
Expand All @@ -37,7 +38,7 @@ extern "C" {
#define QUICLY_FRAME_TYPE_PING 1
#define QUICLY_FRAME_TYPE_ACK 2
#define QUICLY_FRAME_TYPE_ACK_ECN 3
#define QUICLY_FRAME_TYPE_RESET_STREAM 4 /* RESET_STREAM */
#define QUICLY_FRAME_TYPE_RESET_STREAM 4
#define QUICLY_FRAME_TYPE_STOP_SENDING 5
#define QUICLY_FRAME_TYPE_CRYPTO 6
#define QUICLY_FRAME_TYPE_NEW_TOKEN 7
Expand All @@ -57,6 +58,7 @@ extern "C" {
#define QUICLY_FRAME_TYPE_TRANSPORT_CLOSE 28
#define QUICLY_FRAME_TYPE_APPLICATION_CLOSE 29
#define QUICLY_FRAME_TYPE_HANDSHAKE_DONE 30
#define QUICLY_FRAME_TYPE_RESET_STREAM_AT 32
#define QUICLY_FRAME_TYPE_DATAGRAM_NOLEN 48
#define QUICLY_FRAME_TYPE_DATAGRAM_WITHLEN 49
#define QUICLY_FRAME_TYPE_ACK_FREQUENCY 0xaf
Expand All @@ -70,7 +72,7 @@ extern "C" {
#define QUICLY_MAX_STREAM_DATA_FRAME_CAPACITY (1 + 8 + 8)
#define QUICLY_MAX_STREAMS_FRAME_CAPACITY (1 + 8)
#define QUICLY_PING_FRAME_CAPACITY 1
#define QUICLY_RST_FRAME_CAPACITY (1 + 8 + 8 + 8)
#define QUICLY_RST_FRAME_CAPACITY (8 + 8 + 8 + 8 + 8) /* for RESET_STREAM_AT allocate space for type and reliable_size */
#define QUICLY_DATA_BLOCKED_FRAME_CAPACITY (1 + 8)
#define QUICLY_STREAM_DATA_BLOCKED_FRAME_CAPACITY (1 + 8 + 8)
#define QUICLY_STREAMS_BLOCKED_FRAME_CAPACITY (1 + 8)
Expand Down Expand Up @@ -109,15 +111,18 @@ static int quicly_decode_stream_frame(uint8_t type_flags, const uint8_t **src, c
static uint8_t *quicly_encode_crypto_frame_header(uint8_t *dst, uint8_t *dst_end, uint64_t offset, size_t *data_len);
static int quicly_decode_crypto_frame(const uint8_t **src, const uint8_t *end, quicly_stream_frame_t *frame);

static uint8_t *quicly_encode_reset_stream_frame(uint8_t *dst, uint64_t stream_id, uint16_t app_error_code, uint64_t final_size);
static uint8_t *quicly_encode_reset_stream_frame(uint8_t *dst, uint64_t stream_id, uint16_t app_error_code, uint64_t final_size,
uint64_t reliable_size);

typedef struct st_quicly_reset_stream_frame_t {
uint64_t stream_id;
uint16_t app_error_code;
uint64_t final_size;
uint64_t reliable_size;
} quicly_reset_stream_frame_t;

static int quicly_decode_reset_stream_frame(const uint8_t **src, const uint8_t *end, quicly_reset_stream_frame_t *frame);
static int quicly_decode_reset_stream_frame(uint64_t frame_type, const uint8_t **src, const uint8_t *end,
quicly_reset_stream_frame_t *frame);

typedef struct st_quicly_transport_close_frame_t {
uint16_t error_code;
Expand Down Expand Up @@ -449,16 +454,24 @@ inline int quicly_decode_crypto_frame(const uint8_t **src, const uint8_t *end, q
return QUICLY_TRANSPORT_ERROR_FRAME_ENCODING;
}

inline uint8_t *quicly_encode_reset_stream_frame(uint8_t *dst, uint64_t stream_id, uint16_t app_error_code, uint64_t final_size)
inline uint8_t *quicly_encode_reset_stream_frame(uint8_t *dst, uint64_t stream_id, uint16_t app_error_code, uint64_t final_size,
uint64_t reliable_size)
{
*dst++ = QUICLY_FRAME_TYPE_RESET_STREAM;
if (reliable_size == 0) {
*dst++ = QUICLY_FRAME_TYPE_RESET_STREAM;
} else {
dst = quicly_encodev(dst, QUICLY_FRAME_TYPE_RESET_STREAM_AT);
}
dst = quicly_encodev(dst, stream_id);
dst = quicly_encodev(dst, app_error_code);
dst = quicly_encodev(dst, final_size);
if (reliable_size != 0)
dst = quicly_encodev(dst, reliable_size);
return dst;
}

inline int quicly_decode_reset_stream_frame(const uint8_t **src, const uint8_t *end, quicly_reset_stream_frame_t *frame)
inline int quicly_decode_reset_stream_frame(uint64_t frame_type, const uint8_t **src, const uint8_t *end,
quicly_reset_stream_frame_t *frame)
{
uint64_t error_code;

Expand All @@ -468,6 +481,13 @@ inline int quicly_decode_reset_stream_frame(const uint8_t **src, const uint8_t *
goto Error;
frame->app_error_code = (uint16_t)error_code;
frame->final_size = quicly_decodev(src, end);
if (frame_type != QUICLY_FRAME_TYPE_RESET_STREAM) {
assert(frame_type == QUICLY_FRAME_TYPE_RESET_STREAM_AT);
frame->reliable_size = quicly_decodev(src, end);
} else {
frame->reliable_size = 0;
}

return 0;
Error:
return QUICLY_TRANSPORT_ERROR_FRAME_ENCODING;
Expand Down
6 changes: 5 additions & 1 deletion include/quicly/recvstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ typedef struct st_quicly_recvstate_t {
* end_of_stream offset (or UINT64_MAX)
*/
uint64_t eos;
/**
*
*/
uint64_t reliable_size;
} quicly_recvstate_t;

void quicly_recvstate_init(quicly_recvstate_t *state);
Expand All @@ -57,7 +61,7 @@ static size_t quicly_recvstate_bytes_available(quicly_recvstate_t *state);
* backward from the end of given range).
*/
int quicly_recvstate_update(quicly_recvstate_t *state, uint64_t off, size_t *len, int is_fin, size_t max_ranges);
int quicly_recvstate_reset(quicly_recvstate_t *state, uint64_t eos_at, uint64_t *bytes_missing);
int quicly_recvstate_reset(quicly_recvstate_t *state, uint64_t final_size, uint64_t reliable_size, uint64_t *bytes_missing);

/* inline definitions */

Expand Down
4 changes: 4 additions & 0 deletions include/quicly/streambuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ void quicly_streambuf_egress_emit(quicly_stream_t *stream, size_t off, void *dst
static int quicly_streambuf_egress_write(quicly_stream_t *stream, const void *src, size_t len);
static int quicly_streambuf_egress_write_vec(quicly_stream_t *stream, quicly_sendbuf_vec_t *vec);
int quicly_streambuf_egress_shutdown(quicly_stream_t *stream);
/**
* Resets the stream while making sure that all bytes up to `reliable_size` are received.
*/
int quicly_streambuf_egress_reset(quicly_stream_t *stream, uint64_t reliable_size, int err);
static void quicly_streambuf_ingress_shift(quicly_stream_t *stream, size_t delta);
static ptls_iovec_t quicly_streambuf_ingress_get(quicly_stream_t *stream);
/**
Expand Down
107 changes: 86 additions & 21 deletions lib/quicly.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
#define QUICLY_TRANSPORT_PARAMETER_ID_INITIAL_SOURCE_CONNECTION_ID 15
#define QUICLY_TRANSPORT_PARAMETER_ID_RETRY_SOURCE_CONNECTION_ID 16
#define QUICLY_TRANSPORT_PARAMETER_ID_MAX_DATAGRAM_FRAME_SIZE 0x20
#define QUICLY_TRANSPORT_PARAMETER_ID_RELIABLE_STREAM_RESET 0x17f7586d2cb570
#define QUICLY_TRANSPORT_PARAMETER_ID_MIN_ACK_DELAY 0xff03de1a

/**
Expand Down Expand Up @@ -1108,6 +1109,7 @@ static void init_stream_properties(quicly_stream_t *stream, uint32_t initial_max
stream->_send_aux.blocked = QUICLY_SENDER_STATE_NONE;
quicly_linklist_init(&stream->_send_aux.pending_link.control);
quicly_linklist_init(&stream->_send_aux.pending_link.default_scheduler);
stream->_send_aux.is_reliable_reset = 0;

stream->_recv_aux.window = initial_max_stream_data_local;

Expand Down Expand Up @@ -1922,6 +1924,8 @@ int quicly_encode_transport_parameter_list(ptls_buffer_t *buf, const quicly_tran
}
if (params->disable_active_migration)
PUSH_TP(buf, QUICLY_TRANSPORT_PARAMETER_ID_DISABLE_ACTIVE_MIGRATION, {});
if (params->reliable_stream_reset)
PUSH_TP(buf, QUICLY_TRANSPORT_PARAMETER_ID_RELIABLE_STREAM_RESET, {});
if (QUICLY_LOCAL_ACTIVE_CONNECTION_ID_LIMIT != QUICLY_DEFAULT_ACTIVE_CONNECTION_ID_LIMIT)
PUSH_TP(buf, QUICLY_TRANSPORT_PARAMETER_ID_ACTIVE_CONNECTION_ID_LIMIT,
{ ptls_buffer_push_quicint(buf, QUICLY_LOCAL_ACTIVE_CONNECTION_ID_LIMIT); });
Expand Down Expand Up @@ -2126,6 +2130,7 @@ int quicly_decode_transport_parameter_list(quicly_transport_parameters_t *params
params->active_connection_id_limit = v;
});
DECODE_TP(QUICLY_TRANSPORT_PARAMETER_ID_DISABLE_ACTIVE_MIGRATION, { params->disable_active_migration = 1; });
DECODE_TP(QUICLY_TRANSPORT_PARAMETER_ID_RELIABLE_STREAM_RESET, { params->reliable_stream_reset = 1; });
DECODE_TP(QUICLY_TRANSPORT_PARAMETER_ID_MAX_DATAGRAM_FRAME_SIZE, {
uint64_t v;
if ((v = ptls_decode_quicint(&src, end)) == UINT64_MAX) {
Expand Down Expand Up @@ -3677,7 +3682,7 @@ static int send_control_frames_of_stream(quicly_stream_t *stream, quicly_send_co
on_ack_reset_stream)) != 0)
return ret;
s->dst = quicly_encode_reset_stream_frame(s->dst, stream->stream_id, stream->_send_aux.reset_stream.error_code,
stream->sendstate.size_inflight);
stream->sendstate.size_inflight, 0);
++stream->conn->super.stats.num_frames_sent.reset_stream;
QUICLY_PROBE(RESET_STREAM_SEND, stream->conn, stream->conn->stash.now, stream->stream_id,
stream->_send_aux.reset_stream.error_code, stream->sendstate.size_inflight);
Expand Down Expand Up @@ -3828,6 +3833,25 @@ int quicly_send_stream(quicly_stream_t *stream, quicly_send_context_t *s)
*dst++ = QUICLY_FRAME_TYPE_CRYPTO;
dst = quicly_encodev(dst, off);
len = s->dst_end - dst;
} else if (off == stream->sendstate.final_size && stream->_send_aux.is_reliable_reset) {
/* reliable reset is sent in a special way */
if ((ret = allocate_ack_eliciting_frame(stream->conn, s, QUICLY_RST_FRAME_CAPACITY, &sent, on_ack_stream)) != 0)
return ret;
s->dst = quicly_encode_reset_stream_frame(s->dst, stream->stream_id, stream->_send_aux.reset_stream.error_code,
stream->sendstate.final_size, stream->sendstate.final_size);
++stream->conn->super.stats.num_frames_sent.reset_stream_at;
QUICLY_PROBE(RESET_STREAM_AT_SEND, stream->conn, stream->conn->stash.now, stream->stream_id,
stream->_send_aux.reset_stream.error_code, stream->sendstate.final_size, stream->sendstate.final_size);
QUICLY_LOG_CONN(reset_stream_at_send, stream->conn, {
PTLS_LOG_ELEMENT_SIGNED(stream_id, stream->stream_id);
PTLS_LOG_ELEMENT_UNSIGNED(error_code, stream->_send_aux.reset_stream.error_code);
PTLS_LOG_ELEMENT_UNSIGNED(final_size, stream->sendstate.final_size);
PTLS_LOG_ELEMENT_UNSIGNED(reliable_size, stream->sendstate.final_size);
});
len = 0;
is_fin = 1;
wrote_all = 1;
goto UpdateState_AllFrames;
} else {
uint8_t header[18], *hp = header + 1;
hp = quicly_encodev(hp, stream->stream_id);
Expand Down Expand Up @@ -3909,8 +3933,13 @@ int quicly_send_stream(quicly_stream_t *stream, quicly_send_context_t *s)
if (off + len == stream->sendstate.final_size) {
assert(!quicly_sendstate_is_open(&stream->sendstate));
assert(s->dst != NULL);
is_fin = 1;
*s->dst |= QUICLY_FRAME_TYPE_STREAM_BIT_FIN;
if (stream->_send_aux.is_reliable_reset) {
is_fin = 0;
wrote_all = 0;
} else {
is_fin = 1;
*s->dst |= QUICLY_FRAME_TYPE_STREAM_BIT_FIN;
}
} else {
is_fin = 0;
}
Expand All @@ -3935,8 +3964,9 @@ int quicly_send_stream(quicly_stream_t *stream, quicly_send_context_t *s)
PTLS_LOG_ELEMENT_UNSIGNED(len, len);
PTLS_LOG_ELEMENT_BOOL(is_fin, is_fin);
});

QUICLY_PROBE(QUICTRACE_SEND_STREAM, stream->conn, stream->conn->stash.now, stream, off, len, is_fin);

UpdateState_AllFrames:
/* update sendstate (and also MAX_DATA counter) */
if (stream->sendstate.size_inflight < off + len) {
if (stream->stream_id >= 0)
Expand Down Expand Up @@ -5398,21 +5428,35 @@ static int handle_reset_stream_frame(quicly_conn_t *conn, struct st_quicly_handl
quicly_stream_t *stream;
int ret;

if ((ret = quicly_decode_reset_stream_frame(&state->src, state->end, &frame)) != 0)
if ((ret = quicly_decode_reset_stream_frame(state->frame_type, &state->src, state->end, &frame)) != 0)
return ret;
QUICLY_PROBE(RESET_STREAM_RECEIVE, conn, conn->stash.now, frame.stream_id, frame.app_error_code, frame.final_size);
QUICLY_LOG_CONN(reset_stream_receive, conn, {
PTLS_LOG_ELEMENT_SIGNED(stream_id, (quicly_stream_id_t)frame.stream_id);
PTLS_LOG_ELEMENT_UNSIGNED(app_error_code, frame.app_error_code);
PTLS_LOG_ELEMENT_UNSIGNED(final_size, frame.final_size);
});
switch (state->frame_type) {
case QUICLY_FRAME_TYPE_RESET_STREAM:
QUICLY_PROBE(RESET_STREAM_RECEIVE, conn, conn->stash.now, frame.stream_id, frame.app_error_code, frame.final_size);
QUICLY_LOG_CONN(reset_stream_receive, conn, {
PTLS_LOG_ELEMENT_SIGNED(stream_id, (quicly_stream_id_t)frame.stream_id);
PTLS_LOG_ELEMENT_UNSIGNED(app_error_code, frame.app_error_code);
PTLS_LOG_ELEMENT_UNSIGNED(final_size, frame.final_size);
});
break;
case QUICLY_FRAME_TYPE_RESET_STREAM_AT:
QUICLY_PROBE(RESET_STREAM_AT_RECEIVE, conn, conn->stash.now, frame.stream_id, frame.app_error_code, frame.final_size,
frame.reliable_size);
QUICLY_LOG_CONN(reset_stream_at_receive, conn, {
PTLS_LOG_ELEMENT_SIGNED(stream_id, (quicly_stream_id_t)frame.stream_id);
PTLS_LOG_ELEMENT_UNSIGNED(app_error_code, frame.app_error_code);
PTLS_LOG_ELEMENT_UNSIGNED(final_size, frame.final_size);
PTLS_LOG_ELEMENT_UNSIGNED(reliable_size, frame.reliable_size);
});
break;
}

if ((ret = quicly_get_or_open_stream(conn, frame.stream_id, &stream)) != 0 || stream == NULL)
return ret;

if (!quicly_recvstate_transfer_complete(&stream->recvstate)) {
uint64_t bytes_missing;
if ((ret = quicly_recvstate_reset(&stream->recvstate, frame.final_size, &bytes_missing)) != 0)
if ((ret = quicly_recvstate_reset(&stream->recvstate, frame.final_size, frame.reliable_size, &bytes_missing)) != 0)
return ret;
stream->conn->ingress.max_data.bytes_consumed += bytes_missing;
int err = QUICLY_ERROR_FROM_APPLICATION_ERROR_CODE(frame.app_error_code);
Expand All @@ -5431,6 +5475,11 @@ static int handle_reset_stream_frame(quicly_conn_t *conn, struct st_quicly_handl
return 0;
}

static int handle_reset_stream_at_frame(quicly_conn_t *conn, struct st_quicly_handle_payload_state_t *state)
{
return handle_reset_stream_frame(conn, state);
}

static int handle_ack_frame(quicly_conn_t *conn, struct st_quicly_handle_payload_state_t *state)
{
quicly_ack_frame_t frame;
Expand Down Expand Up @@ -6281,15 +6330,16 @@ static int handle_payload(quicly_conn_t *conn, size_t epoch, const uint8_t *_src
offsetof(quicly_conn_t, super.stats.num_frames_received.lc) \
}, \
}
/* +----------------------------------+-------------------+---------------+
* | frame | permitted epochs | |
* |------------------+---------------+----+----+----+----+ ack-eliciting |
* | upper-case | lower-case | IN | 0R | HS | 1R | |
* +------------------+---------------+----+----+----+----+---------------+ */
FRAME( DATAGRAM_NOLEN , datagram , 0 , 1, 0, 1 , 1 ),
FRAME( DATAGRAM_WITHLEN , datagram , 0 , 1, 0, 1 , 1 ),
FRAME( ACK_FREQUENCY , ack_frequency , 0 , 0 , 0 , 1 , 1 ),
/* +------------------+---------------+-------------------+---------------+ */
/* +------------------------------------+-------------------+---------------+
* | frame | permitted epochs | |
* |------------------+-----------------+----+----+----+----+ ack-eliciting |
* | upper-case | lower-case | IN | 0R | HS | 1R | |
* +------------------+-----------------+----+----+----+----+---------------+ */
FRAME( DATAGRAM_NOLEN , datagram , 0 , 1 , 0 , 1 , 1 ),
FRAME( DATAGRAM_WITHLEN , datagram , 0 , 1 , 0 , 1 , 1 ),
FRAME( RESET_STREAM_AT , reset_stream_at , 0 , 1 , 0 , 1 , 1 ),
FRAME( ACK_FREQUENCY , ack_frequency , 0 , 0 , 0 , 1 , 1 ),
/* +------------------+-----------------+-------------------+---------------+ */
#undef FRAME
{UINT64_MAX},
};
Expand Down Expand Up @@ -6835,6 +6885,21 @@ void quicly_reset_stream(quicly_stream_t *stream, int err)
resched_stream_data(stream);
}

int quicly_reset_stream_reliable(quicly_stream_t *stream, uint64_t reliable_size, int err)
{
assert(stream->sendstate.final_size == UINT64_MAX && stream->_send_aux.reset_stream.sender_state == QUICLY_SENDER_STATE_NONE &&
"reliable reset cannot be used after the stream is shutdown or reset");

/* for simplicity, reliable size is rounded up to `size_inflight`, then that value is set as `final_size` */
if (reliable_size < stream->sendstate.size_inflight)
reliable_size = stream->sendstate.size_inflight;

stream->_send_aux.reset_stream.error_code = QUICLY_ERROR_GET_ERROR_CODE(err);
stream->_send_aux.is_reliable_reset = 1;

return quicly_sendstate_shutdown(&stream->sendstate, reliable_size);
}

void quicly_request_stop(quicly_stream_t *stream, int err)
{
assert(quicly_stream_has_receive_side(quicly_is_client(stream->conn), stream->stream_id));
Expand Down
Loading