Skip to content

Fix: Notify app of client disconnection when request is in progress. #1556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/nxt_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ struct nxt_port_handlers_s {
/* Status report. */
nxt_port_handler_t status;

nxt_port_handler_t client_error;

nxt_port_handler_t oosm;
nxt_port_handler_t shm_ack;
nxt_port_handler_t read_queue;
Expand Down Expand Up @@ -115,6 +117,8 @@ typedef enum {
_NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
_NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status),

_NXT_PORT_MSG_CLIENT_ERROR = nxt_port_handler_idx(client_error),

_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
_NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
Expand Down Expand Up @@ -160,6 +164,8 @@ typedef enum {
NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS),

NXT_PORT_MSG_CLIENT_ERROR = nxt_msg_last(_NXT_PORT_MSG_CLIENT_ERROR),

NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
Expand Down
11 changes: 10 additions & 1 deletion src/nxt_router.c
Original file line number Diff line number Diff line change
Expand Up @@ -5300,7 +5300,16 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);

if (r->req_rpc_data != NULL) {
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data;

if (r->error) {
nxt_port_socket_write(task, req_rpc_data->app_port,
NXT_PORT_MSG_CLIENT_ERROR,
-1, req_rpc_data->stream,
task->thread->engine->port->id, NULL);
}

nxt_request_rpc_data_unlink(task, req_rpc_data);
}

nxt_http_request_close_handler(task, r, r->proto.any);
Expand Down
44 changes: 34 additions & 10 deletions src/nxt_unit.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_client_error(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
Expand Down Expand Up @@ -1121,6 +1123,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
rc = nxt_unit_process_websocket(ctx, &recv_msg);
break;

case _NXT_PORT_MSG_CLIENT_ERROR:
rc = nxt_unit_process_client_error(ctx, &recv_msg);
break;

case _NXT_PORT_MSG_REMOVE_PID:
if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
Expand Down Expand Up @@ -1377,18 +1383,16 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,

lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

res = nxt_unit_request_hash_add(ctx, req);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
nxt_unit_req_warn(req, "failed to add request to hash");
nxt_unit_request_done(req, NXT_UNIT_ERROR);
return NXT_UNIT_ERROR;
}

if (req->content_length
> (uint64_t) (req->content_buf->end - req->content_buf->free))
{
res = nxt_unit_request_hash_add(ctx, req);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
nxt_unit_req_warn(req, "failed to add request to hash");

nxt_unit_request_done(req, NXT_UNIT_ERROR);

return NXT_UNIT_ERROR;
}

/*
* If application have separate data handler, we may start
* request processing and process data when it is arrived.
Expand Down Expand Up @@ -1418,7 +1422,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_mmap_buf_t *b;
nxt_unit_request_info_t *req;

req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
if (req == NULL) {
return NXT_UNIT_OK;
}
Expand Down Expand Up @@ -1722,6 +1726,26 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_OK;
}

static int
nxt_unit_process_client_error(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I remember, this line will need wrapping as it just goes over the 80 character limit...

{
nxt_unit_impl_t *lib;
nxt_unit_request_info_t *req;

req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);

if (req == NULL) {
return NXT_UNIT_OK;
}

lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);

if (lib->callbacks.close_handler) {
lib->callbacks.close_handler(req);
}

return NXT_UNIT_OK;
}

static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
Expand Down