diff --git a/index.js b/index.js index fc1a5e4..faee49d 100644 --- a/index.js +++ b/index.js @@ -8,11 +8,11 @@ import http from 'http' import https from 'https' class Rewrite extends EventEmitter { - description () { + description() { return 'URL Rewriting. Use to re-route requests to local or remote resources.' } - optionDefinitions () { + optionDefinitions() { return [ { name: 'rewrite', @@ -30,16 +30,24 @@ class Rewrite extends EventEmitter { ] } - middleware (options, lws) { + middleware(options, lws) { const rules = util.parseRewriteRules(options.rewrite) + let httpProxyAgent, httpsProxyAgent + const httpProxy = process.env.http_proxy + if (httpProxy) { + httpsProxyAgent = new HttpsProxyAgent(httpProxy) + httpProxyAgent = new HttpProxyAgent(httpProxy) + } if (rules.length) { this.emit('verbose', 'middleware.rewrite.config', { rewrite: rules }) + /* attach websocket proxy (upgrade) handler once if there are any remote-capable rules */ + setupWebSocketProxy(rules, this, lws, { httpProxyAgent, httpsProxyAgent }) /* return one middleware per defined rewrite rule */ return rules.map(rule => { if (rule.to) { /* `to` address is remote if the url specifies a host */ if (url.parse(rule.to).host) { - return _.all(rule.from, proxyRequest(rule, this, lws)) + return _.all(rule.from, proxyRequest(rule, this, lws, { httpProxyAgent, httpsProxyAgent })) } else { const rmw = rewrite(rule.from, rule.to, this) return rmw @@ -50,17 +58,112 @@ class Rewrite extends EventEmitter { } } -function proxyRequest (route, mw, lws) { - let id = 1 +function setupWebSocketProxy(rules, mw, lws, { httpProxyAgent, httpsProxyAgent }) { + /* only attach if there’s at least one rule pointing to a remote host (http/https/ws/wss) */ + const hasRemoteRule = rules.some(r => { + if (!r || !r.to) return false + const parsed = url.parse(r.to) + return !!parsed.host && /^(https?:|wss?:)$/.test(parsed.protocol || 'http:') + }) + if (!hasRemoteRule) return + + let attached = false - let httpProxyAgent, httpsProxyAgent - const httpProxy = process.env.http_proxy - if (httpProxy) { - httpsProxyAgent = new HttpsProxyAgent(httpProxy) - httpProxyAgent = new HttpProxyAgent(httpProxy) + function attachUpgrade(server) { + if (attached || !server) return + attached = true + + server.on('upgrade', (req, socket, head) => { + /* find a remote rule which matches the request */ + const remoteRule = rules.find(rule => { + if (rule.to && url.parse(rule.to).host) { + const re = util.pathToRegexp(rule.from) + return re.test(req.url) + } + }) + + if (remoteRule) { + const targetUrl = util.getTargetUrl(remoteRule.from, remoteRule.to, req.url) + mw.emit('verbose', 'middleware.rewrite.ws.proxy', { from: req.url, to: targetUrl }) + + const remoteReqOptions = url.parse(targetUrl) + remoteReqOptions.headers = req.headers + remoteReqOptions.rejectUnauthorized = false + if (remoteReqOptions.protocol === 'ws:') { + remoteReqOptions.protocol = 'http:' + } else if (remoteReqOptions.protocol === 'wss:') { + remoteReqOptions.protocol = 'https:' + } + + let transport + const protocol = remoteReqOptions.protocol + if (protocol === 'http:') { + transport = http + remoteReqOptions.agent = httpProxyAgent + } else if (protocol === 'https:') { + transport = https + remoteReqOptions.agent = httpsProxyAgent + } + + const remoteReq = transport.request(remoteReqOptions) + + remoteReq.on('response', (res) => { + /* the remote server sent a regular http response, not an upgrade, write it back to the client */ + let headers = '' + for (const [key, value] of Object.entries(res.headers)) { + headers += `${key}: ${value}\r\n` + } + socket.write(`HTTP/1.1 ${res.statusCode} ${res.statusMessage}\r\n${headers}\r\n`) + res.pipe(socket) + }) + + remoteReq.on('upgrade', (remoteRes, remoteSocket, remoteHead) => { + /* write the upgrade response from target back to client */ + let response = `HTTP/1.1 ${remoteRes.statusCode} ${remoteRes.statusMessage}\r\n` + for (let i = 0; i < remoteRes.rawHeaders.length; i += 2) { + response += `${remoteRes.rawHeaders[i]}: ${remoteRes.rawHeaders[i + 1]}\r\n` + } + response += '\r\n' + socket.write(response) + + if (remoteHead && remoteHead.length) remoteSocket.write(remoteHead) + if (head && head.length) socket.write(head) + + remoteSocket.pipe(socket).pipe(remoteSocket) + + remoteSocket.on('error', (err) => { mw.emit('error', 'middleware.rewrite.ws.remote-socket-error', { err }); socket.destroy() }) + socket.on('error', (err) => { mw.emit('error', 'middleware.rewrite.ws.client-socket-error', { err }); remoteSocket.destroy() }) + remoteSocket.on('close', () => socket.destroy()) + socket.on('close', () => remoteSocket.destroy()) + }) + + remoteReq.on('error', (err) => { + mw.emit('error', 'middleware.rewrite.ws.error', { err }) + socket.end('HTTP/1.1 502 Bad Gateway\r\n\r\n') + }) + + remoteReq.end() + } + }) + } + + if (lws && lws.server) { + attachUpgrade(lws.server) + } else { + /* server may not exist yet – try attaching shortly after */ + const tryAttach = () => { + if (lws && lws.server) attachUpgrade(lws.server) + else setTimeout(tryAttach, 10) + } + tryAttach() } +} + + +function proxyRequest(route, mw, lws, { httpProxyAgent, httpsProxyAgent }) { + let id = 1 - return function proxyMiddleware (ctx) { + return function proxyMiddleware(ctx) { return new Promise((resolve, reject) => { const isHttp2 = ctx.req.httpVersion === '2.0' ctx.state.id = id++ @@ -163,7 +266,7 @@ function proxyRequest (route, mw, lws) { } } -function rewrite (from, to, mw) { +function rewrite(from, to, mw) { return async function (ctx, next) { const targetUrl = util.getTargetUrl(from, to, ctx.url) if (ctx.url !== targetUrl) { diff --git a/lib/util.js b/lib/util.js index 596f473..d4c5a36 100644 --- a/lib/util.js +++ b/lib/util.js @@ -1,7 +1,7 @@ import arrayify from 'array-back' import { pathToRegexp } from 'path-to-regexp' -function parseRewriteRules (rules) { +function parseRewriteRules(rules) { return arrayify(rules).map(rule => { if (typeof rule === 'string') { const matches = rule.match(/(\S*)\s*->\s*(\S*)/) @@ -16,7 +16,7 @@ function parseRewriteRules (rules) { }) } -function getTargetUrl (from, to, url) { +function getTargetUrl(from, to, url) { const fromParams = [] const re = pathToRegexp(from, fromParams) const fromMatches = re.exec(url) @@ -53,19 +53,19 @@ function getTargetUrl (from, to, url) { } } -function removeHopSpecificHeaders (headers) { +function removeHopSpecificHeaders(headers) { const hopSpecificHeaders = ['connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', 'te', 'trailer', 'transfer-encoding', 'upgrade'] for (const hopHeader of hopSpecificHeaders) { delete headers[hopHeader] } } -function removeCookieAttribute (cookie = '', attr) { +function removeCookieAttribute(cookie = '', attr) { return cookie.split(';') .map(a => a.trim()) .filter(a => a.toLowerCase() !== attr) .join('; ') } -export { parseRewriteRules, getTargetUrl, removeHopSpecificHeaders, removeCookieAttribute } -export default { parseRewriteRules, getTargetUrl, removeHopSpecificHeaders, removeCookieAttribute } +export { parseRewriteRules, getTargetUrl, removeHopSpecificHeaders, removeCookieAttribute, pathToRegexp } +export default { parseRewriteRules, getTargetUrl, removeHopSpecificHeaders, removeCookieAttribute, pathToRegexp } diff --git a/package-lock.json b/package-lock.json index 3a29514..5127691 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25,7 +25,8 @@ "lws-err-detail": "^2.0.0", "lws-static": "^3.1.0", "node-fetch": "^3.3.2", - "test-runner": "^0.11.0" + "test-runner": "^0.11.0", + "ws": "^8.18.3" }, "engines": { "node": ">=12.17" @@ -2039,6 +2040,28 @@ "node": ">=12.17" } }, + "node_modules/ws": { + "version": "8.18.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz", + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/ylru": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/ylru/-/ylru-1.4.0.tgz", diff --git a/package.json b/package.json index 0481f7e..be9be5c 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,8 @@ "lws-err-detail": "^2.0.0", "lws-static": "^3.1.0", "node-fetch": "^3.3.2", - "test-runner": "^0.11.0" + "test-runner": "^0.11.0", + "ws": "^8.18.3" }, "files": [ "index.js", diff --git a/test/ws.js b/test/ws.js new file mode 100644 index 0000000..59d5067 --- /dev/null +++ b/test/ws.js @@ -0,0 +1,110 @@ +import { strict as a } from 'assert' +import TestRunner from 'test-runner' +import Rewrite from 'lws-rewrite' +import Static from 'lws-static' +import Lws from 'lws' +import WebSocket, { WebSocketServer } from 'ws' + +const tom = new TestRunner.Tom('websocket') + +tom.test('proxy WS echo', async function () { + const remotePort = 12000 + this.index + const localPort = 8300 + this.index + + /* remote WS echo server */ + const remoteWss = new WebSocketServer({ port: remotePort }) + await new Promise(resolve => remoteWss.on('listening', resolve)) + remoteWss.on('connection', ws => { + ws.on('message', msg => ws.send(msg)) + }) + + const lws = await Lws.create({ + port: localPort, + stack: [Rewrite, Static], + rewrite: { from: '/ws/(.*)', to: `ws://localhost:${remotePort}/$1` } + }) + + try { + const client = new WebSocket(`ws://localhost:${localPort}/ws/echo`) + const msg = await new Promise((resolve, reject) => { + client.once('open', () => client.send('hello')) + client.once('message', data => resolve(data.toString())) + client.once('error', reject) + }) + a.equal(msg, 'hello') + client.close() + } finally { + lws.server.close() + remoteWss.close() + } +}, { timeout: 120000 }) + +tom.test('proxy WS with path tokens', async function () { + const remotePort = 12000 + this.index + const localPort = 8300 + this.index + let lastRequestedPath = null + + /* remote WS echo server that records request path */ + const remoteWss = new WebSocketServer({ port: remotePort }) + await new Promise(resolve => remoteWss.on('listening', resolve)) + remoteWss.on('connection', (ws, req) => { + lastRequestedPath = req.url + ws.on('message', msg => ws.send(msg)) + }) + + const lws = await Lws.create({ + port: localPort, + stack: [Rewrite, Static], + rewrite: { from: '/ws/:room/:id', to: `ws://localhost:${remotePort}/rooms/:room?id=:id` } + }) + + try { + const client = new WebSocket(`ws://localhost:${localPort}/ws/chat/42`) + const msg = await new Promise((resolve, reject) => { + client.once('open', () => client.send('hey')) + client.once('message', data => resolve(data.toString())) + client.once('error', reject) + }) + a.equal(msg, 'hey') + a.equal(lastRequestedPath, '/rooms/chat?id=42') + client.close() + } finally { + lws.server.close() + remoteWss.close() + } +}, { timeout: 120000 }) + +tom.test('proxy WS when rule target is http (auto-convert to ws)', async function () { + const remotePort = 12000 + this.index + const localPort = 8300 + this.index + + /* remote WS echo server */ + const remoteWss = new WebSocketServer({ port: remotePort }) + await new Promise(resolve => remoteWss.on('listening', resolve)) + remoteWss.on('connection', ws => { + ws.on('message', msg => ws.send(msg)) + }) + + /* note the `http://` target – should be auto-converted to ws:// for the upgrade path */ + const lws = await Lws.create({ + port: localPort, + stack: [Rewrite, Static], + rewrite: { from: '/ws/(.*)', to: `http://localhost:${remotePort}/$1` } + }) + + try { + const client = new WebSocket(`ws://localhost:${localPort}/ws/echo2`) + const msg = await new Promise((resolve, reject) => { + client.once('open', () => client.send('auto')) + client.once('message', data => resolve(data.toString())) + client.once('error', reject) + }) + a.equal(msg, 'auto') + client.close() + } finally { + lws.server.close() + remoteWss.close() + } +}, { timeout: 120000 }) + +export default tom \ No newline at end of file