Skip to content

Commit 6cd6fca

Browse files
Previously, the app was not notified when the client disconnection.
This caused issues especially in cases of websocket connections and SSE Events where the app continued to send data to the router, which could not deliver it to the client due to the disconnection. Changes made: Added functionality to send a port message to notify the app of client disconnection in form of port message(_NXT_PORT_MSG_CLIENT_ERROR). On the App side, handled this message and called the registered close_hanlder callback if registered.
1 parent b4201ab commit 6cd6fca

File tree

3 files changed

+47
-11
lines changed

3 files changed

+47
-11
lines changed

Diff for: src/nxt_port.h

+4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ struct nxt_port_handlers_s {
5959
/* Status report. */
6060
nxt_port_handler_t status;
6161

62+
nxt_port_handler_t client_error;
63+
6264
nxt_port_handler_t oosm;
6365
nxt_port_handler_t shm_ack;
6466
nxt_port_handler_t read_queue;
@@ -115,6 +117,7 @@ typedef enum {
115117
_NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
116118
_NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status),
117119

120+
_NXT_PORT_MSG_CLIENT_ERROR = nxt_port_handler_idx(client_error),
118121
_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
119122
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
120123
_NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
@@ -159,6 +162,7 @@ typedef enum {
159162
NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA),
160163
NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
161164
NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS),
165+
NXT_PORT_MSG_CLIENT_ERROR = nxt_msg_last(_NXT_PORT_MSG_CLIENT_ERROR),
162166

163167
NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
164168
NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),

Diff for: src/nxt_router.c

+10-1
Original file line numberDiff line numberDiff line change
@@ -5300,7 +5300,16 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
53005300
nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
53015301

53025302
if (r->req_rpc_data != NULL) {
5303-
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5303+
nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data;
5304+
5305+
if (r->error) {
5306+
nxt_port_socket_write(task, req_rpc_data->app_port,
5307+
NXT_PORT_MSG_CLIENT_ERROR,
5308+
-1, req_rpc_data->stream,
5309+
task->thread->engine->port->id, NULL);
5310+
}
5311+
5312+
nxt_request_rpc_data_unlink(task, req_rpc_data);
53045313
}
53055314

53065315
nxt_http_request_close_handler(task, r, r->proto.any);

Diff for: src/nxt_unit.c

+33-10
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
7474
static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
7575
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
7676
nxt_unit_recv_msg_t *recv_msg);
77+
static int nxt_unit_process_client_error(nxt_unit_ctx_t *ctx,
78+
nxt_unit_recv_msg_t *recv_msg);
7779
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
7880
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
7981
nxt_unit_ctx_t *ctx);
@@ -1121,6 +1123,9 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
11211123
rc = nxt_unit_process_websocket(ctx, &recv_msg);
11221124
break;
11231125

1126+
case _NXT_PORT_MSG_CLIENT_ERROR:
1127+
rc = nxt_unit_process_client_error(ctx, &recv_msg);
1128+
break;
11241129
case _NXT_PORT_MSG_REMOVE_PID:
11251130
if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
11261131
nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
@@ -1377,18 +1382,16 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
13771382

13781383
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
13791384

1385+
res = nxt_unit_request_hash_add(ctx, req);
1386+
if (nxt_slow_path(res != NXT_UNIT_OK)) {
1387+
nxt_unit_req_warn(req, "failed to add request to hash");
1388+
nxt_unit_request_done(req, NXT_UNIT_ERROR);
1389+
return NXT_UNIT_ERROR;
1390+
}
1391+
13801392
if (req->content_length
13811393
> (uint64_t) (req->content_buf->end - req->content_buf->free))
13821394
{
1383-
res = nxt_unit_request_hash_add(ctx, req);
1384-
if (nxt_slow_path(res != NXT_UNIT_OK)) {
1385-
nxt_unit_req_warn(req, "failed to add request to hash");
1386-
1387-
nxt_unit_request_done(req, NXT_UNIT_ERROR);
1388-
1389-
return NXT_UNIT_ERROR;
1390-
}
1391-
13921395
/*
13931396
* If application have separate data handler, we may start
13941397
* request processing and process data when it is arrived.
@@ -1418,7 +1421,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
14181421
nxt_unit_mmap_buf_t *b;
14191422
nxt_unit_request_info_t *req;
14201423

1421-
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1424+
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
14221425
if (req == NULL) {
14231426
return NXT_UNIT_OK;
14241427
}
@@ -1722,6 +1725,26 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
17221725
return NXT_UNIT_OK;
17231726
}
17241727

1728+
static int
1729+
nxt_unit_process_client_error(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1730+
{
1731+
nxt_unit_request_info_t *req;
1732+
nxt_unit_impl_t *lib;
1733+
1734+
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
1735+
1736+
if (req == NULL) {
1737+
return NXT_UNIT_OK;
1738+
}
1739+
1740+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1741+
1742+
if (lib->callbacks.close_handler) {
1743+
lib->callbacks.close_handler(req);
1744+
}
1745+
1746+
return NXT_UNIT_OK;
1747+
}
17251748

17261749
static int
17271750
nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)

0 commit comments

Comments
 (0)