Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/2.adapters/node.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,35 @@ server.on("upgrade", (req, socket, head) => {
See [`test/fixture/node.ts`](https://github.com/h3js/crossws/blob/main/test/fixture/node.ts) for demo and [`src/adapters/node.ts`](https://github.com/h3js/crossws/blob/main/src/adapters/node.ts) for implementation.
::

## Delegating to an existing Node.js upgrade handler

If you already have a Node.js WebSocket library that exposes a raw `(req, socket, head)` upgrade handler (e.g. [`ws`](https://github.com/websockets/ws), `socket.io`, `express-ws`), you can route to it through crossws using `fromNodeUpgradeHandler`. This lets you keep crossws's upgrade-time request handling while delegating the WebSocket lifecycle to your existing library.

```ts
import { WebSocketServer } from "ws";
import { fromNodeUpgradeHandler } from "crossws/adapters/node";
import { serve } from "crossws/server/node";

const wss = new WebSocketServer({ noServer: true });
wss.on("connection", (ws) => {
ws.on("message", (data) => ws.send(data));
});

serve({
fetch: () => new Response("ok"),
websocket: fromNodeUpgradeHandler((req, socket, head) => {
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit("connection", ws, req);
});
}),
});
```

The underlying handler takes full ownership of the socket, so crossws's other lifecycle hooks (`open`, `message`, `close`, `error`) are **not** invoked for connections routed through it — manage the WebSocket lifecycle inside your own library as usual.

> [!NOTE]
> `fromNodeUpgradeHandler` only works on the Node.js runtime, and must be used via the crossws node server plugin so the request carries `runtime.node.upgrade.{socket, head}`.
Comment thread
pi0 marked this conversation as resolved.

## uWebSockets

You can alternatively use [uWebSockets.js](https://github.com/uNetworking/uWebSockets.js) for Node.js servers.
Expand Down
11 changes: 10 additions & 1 deletion src/adapters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,17 @@ const nodeAdapter: Adapter<NodeAdapter, NodeOptions> = (options = {}) => {
handleUpgrade: async (nodeReq, socket, head, webRequest) => {
const request = webRequest || new NodeReqProxy(nodeReq);

const { upgradeHeaders, endResponse, context, namespace } =
const { upgradeHeaders, endResponse, handled, context, namespace } =
await hooks.upgrade(request);
if (endResponse) {
return sendResponse(socket, endResponse);
}
// Upgrade was performed by the hook (e.g. delegated to an external
// node-style handler via `fromNodeUpgradeHandler`). The socket has
// been taken over — leave it alone.
if (handled) {
return;
}

(nodeReq as AugmentedReq)._request = request;
(nodeReq as AugmentedReq)._upgradeHeaders = upgradeHeaders;
Expand All @@ -138,6 +144,9 @@ const nodeAdapter: Adapter<NodeAdapter, NodeOptions> = (options = {}) => {

export default nodeAdapter;

export { fromNodeUpgradeHandler } from "../node-handler.ts";
export type { NodeUpgradeHandler } from "../node-handler.ts";

// --- peer ---

class NodePeer extends Peer<{
Expand Down
15 changes: 14 additions & 1 deletion src/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export class AdapterHookable {
namespace: string;
upgradeHeaders?: HeadersInit;
endResponse?: Response;
handled?: boolean;
}> {
let namespace =
this.options.getNamespace?.(request) ?? new URL(request.url).pathname;
Expand All @@ -74,6 +75,9 @@ export class AdapterHookable {
if (res instanceof Response) {
return { context, namespace, endResponse: res };
}
if ((res as { handled?: boolean }).handled) {
return { context, namespace, handled: true };
}
if (res.headers) {
return {
context,
Expand Down Expand Up @@ -120,6 +124,10 @@ export interface Hooks {
* - You can return { headers } to modify the response.
* - You can return { namespace } to change the pub/sub namespace.
* - You can return { context } to provide a custom peer context.
* - You can return { handled: true } to signal that the upgrade has
* already been performed by the hook (e.g. delegated to an external
* node-style `(req, socket, head)` handler). The adapter will then
* leave the socket alone and skip its own upgrade.
*
* @param request
* @throws {Response}
Expand All @@ -129,7 +137,12 @@ export interface Hooks {
readonly context?: Record<string, unknown>;
},
) => MaybePromise<
| { headers?: HeadersInit; namespace?: string; context?: PeerContext }
| {
headers?: HeadersInit;
namespace?: string;
context?: PeerContext;
handled?: boolean;
}
| Response
| void
>;
Expand Down
64 changes: 64 additions & 0 deletions src/node-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import type { IncomingMessage } from "node:http";
import type { Duplex } from "node:stream";
import type { Hooks } from "./hooks.ts";

/**
* A Node.js `(req, socket, head)` upgrade handler.
*/
export type NodeUpgradeHandler = (
req: IncomingMessage,
socket: Duplex,
head: Buffer,
) => void | Promise<void>;

/**
* Wrap a Node.js `(req, socket, head)` upgrade handler as a {@link Hooks}
* object that can be mounted via `crossws/server/node`.
*
* The wrapped handler takes ownership of the socket; crossws's other
* lifecycle hooks (`open`/`message`/`close`/`error`) are **not** invoked
* for connections routed through it.
*
* @example
* ```ts
* import { WebSocketServer } from "ws";
* import { fromNodeUpgradeHandler } from "crossws/adapters/node";
* import { serve } from "crossws/server/node";
*
* const wss = new WebSocketServer({ noServer: true });
* wss.on("connection", (ws) => {
* ws.on("message", (data) => ws.send(data));
* });
*
* serve({
* websocket: fromNodeUpgradeHandler((req, socket, head) => {
* wss.handleUpgrade(req, socket, head, (ws) => {
* wss.emit("connection", ws, req);
* });
* }),
* fetch: () => new Response("ok"),
* });
* ```
*/
export function fromNodeUpgradeHandler(
handler: NodeUpgradeHandler,
): Partial<Hooks> {
return {
async upgrade(request) {
const node = (request as { runtime?: { node?: NodeUpgradeCtx } }).runtime
?.node;
if (!node?.upgrade) {
throw new Error(
"[crossws] `fromNodeUpgradeHandler` must be mounted via `crossws/server/node`.",
);
}
await handler(node.req, node.upgrade.socket, node.upgrade.head);
return { handled: true };
},
};
}

interface NodeUpgradeCtx {
req: IncomingMessage;
upgrade?: { socket: Duplex; head: Buffer };
}
115 changes: 115 additions & 0 deletions test/node-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { once } from "node:events";
import { getRandomPort } from "get-port-please";
import { afterEach, beforeEach, expect, test } from "vitest";
import { WebSocketServer } from "ws";
import WebSocket from "ws";
import { fromNodeUpgradeHandler } from "../src/node-handler.ts";
import { serve } from "../src/server/node.ts";

type ServeReturn = ReturnType<typeof serve>;

let currentServer: ServeReturn | undefined;
let currentWss: WebSocketServer | undefined;
let unhandled: unknown[] = [];

function onUnhandled(err: unknown) {
unhandled.push(err);
}

beforeEach(() => {
unhandled = [];
process.on("unhandledRejection", onUnhandled);
process.on("uncaughtException", onUnhandled);
});

afterEach(async () => {
process.off("unhandledRejection", onUnhandled);
process.off("uncaughtException", onUnhandled);
currentWss?.close();
await currentServer?.close(true);
currentServer = undefined;
currentWss = undefined;
// Give any stray async errors a tick to surface before asserting.
await new Promise((r) => setImmediate(r));
if (unhandled.length > 0) {
throw new AggregateError(
unhandled as Error[],
`Unexpected unhandled errors during test: ${unhandled
.map((e) => (e as Error)?.message ?? String(e))
.join("; ")}`,
);
}
});

test("fromNodeUpgradeHandler delegates upgrade to a ws.WebSocketServer", async () => {
const wss = new WebSocketServer({ noServer: true });
currentWss = wss;
const receivedOnUpstream: string[] = [];
wss.on("connection", (ws) => {
ws.on("message", (data) => {
receivedOnUpstream.push(data.toString());
ws.send(`echo:${data.toString()}`);
});
});

const port = await getRandomPort("localhost");
const server = serve({
port,
hostname: "127.0.0.1",
fetch: () => new Response("ok"),
websocket: fromNodeUpgradeHandler((req, socket, head) => {
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit("connection", ws, req);
});
}),
});
currentServer = server;
await server.ready();

const client = new WebSocket(`ws://127.0.0.1:${port}/`);
await once(client, "open");
client.send("hello");
const [reply] = await once(client, "message");
expect(reply.toString()).toBe("echo:hello");
expect(receivedOnUpstream).toEqual(["hello"]);
client.close();
await once(client, "close");
});

test("fromNodeUpgradeHandler does not invoke node adapter's own handleUpgrade", async () => {
// If the handoff sentinel is ignored, the node adapter would try to run
// ws.handleUpgrade on a socket that the user's handler has already taken
// over — the client would see a connection failure or duplicate upgrade.
// This test catches that regression by asserting the upstream handler is
// the *only* thing that opens the WebSocket.
const wss = new WebSocketServer({ noServer: true });
currentWss = wss;
let upstreamOpens = 0;
wss.on("connection", (ws) => {
upstreamOpens++;
ws.on("message", (data) => ws.send(data));
});

const port = await getRandomPort("localhost");
const server = serve({
port,
hostname: "127.0.0.1",
fetch: () => new Response("ok"),
websocket: fromNodeUpgradeHandler((req, socket, head) => {
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit("connection", ws, req);
});
}),
});
currentServer = server;
await server.ready();

const client = new WebSocket(`ws://127.0.0.1:${port}/`);
await once(client, "open");
client.send("ping");
const [reply] = await once(client, "message");
expect(reply.toString()).toBe("ping");
expect(upstreamOpens).toBe(1);
client.close();
await once(client, "close");
});
Loading