Skip to content

Commit 7e9dfc9

Browse files
authored
Merge pull request #1332 from mdesousa/input-output-webstream
fix: log web stream responses
2 parents 341086f + 03e55ca commit 7e9dfc9

File tree

2 files changed

+134
-0
lines changed

2 files changed

+134
-0
lines changed

packages/input-output-logger/index.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Transform } from "node:stream";
2+
import { TransformStream } from "node:stream/web";
23

34
const defaults = {
45
logger: (message) => {
@@ -65,11 +66,19 @@ const inputOutputLoggerMiddleware = (opts = {}) => {
6566
omitAndLog("event", request);
6667
};
6768
const inputOutputLoggerMiddlewareAfter = async (request) => {
69+
// Check for Node.js stream
6870
if (
6971
request.response?._readableState ??
7072
request.response?.body?._readableState
7173
) {
7274
passThrough(request, omitAndLog);
75+
}
76+
// Check for Web stream
77+
else if (
78+
request.response instanceof ReadableStream ||
79+
request.response?.body instanceof ReadableStream
80+
) {
81+
passThroughWebStream(request, omitAndLog);
7382
} else {
7483
omitAndLog("response", request);
7584
}
@@ -162,4 +171,39 @@ const passThrough = (request, omitAndLog) => {
162171
}
163172
};
164173

174+
// Handler for Web Streams API
175+
const passThroughWebStream = (request, omitAndLog) => {
176+
const hasBody = request.response?.body;
177+
let body = "";
178+
179+
const transformer = new TransformStream({
180+
transform(chunk, controller) {
181+
// For web streams, chunks could be various types
182+
const textChunk =
183+
typeof chunk === "string"
184+
? chunk
185+
: chunk instanceof Uint8Array
186+
? new TextDecoder().decode(chunk)
187+
: String(chunk);
188+
body += textChunk;
189+
controller.enqueue(chunk);
190+
},
191+
flush(controller) {
192+
if (hasBody) {
193+
omitAndLog("response", { response: { ...request.response, body } });
194+
} else {
195+
omitAndLog("response", { response: body });
196+
}
197+
},
198+
});
199+
200+
if (hasBody) {
201+
// Handle response with body property that's a ReadableStream
202+
request.response.body = request.response.body.pipeThrough(transformer);
203+
} else {
204+
// Handle response that's directly a ReadableStream
205+
request.response = request.response.pipeThrough(transformer);
206+
}
207+
};
208+
165209
export default inputOutputLoggerMiddleware;

packages/input-output-logger/index.test.js

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,96 @@ test("It should log with streamifyResponse:true using body ReadableStream", asyn
116116
]);
117117
});
118118

119+
test("It should log with Web Streams API using ReadableStream", async (t) => {
120+
const input = "x".repeat(1024 * 1024);
121+
const logger = t.mock.fn();
122+
const handler = middy(
123+
async (event, context, { signal }) => {
124+
// Create a Web ReadableStream
125+
const stream = new ReadableStream({
126+
start(controller) {
127+
controller.enqueue(input);
128+
controller.close();
129+
},
130+
});
131+
return stream;
132+
},
133+
{
134+
streamifyResponse: true,
135+
},
136+
).use(
137+
inputOutputLogger({
138+
logger,
139+
}),
140+
);
141+
142+
const event = {};
143+
let chunkResponse = "";
144+
const responseStream = createWritableStream((chunk) => {
145+
chunkResponse += chunk;
146+
});
147+
const response = await handler(event, responseStream, context);
148+
equal(response, undefined);
149+
equal(chunkResponse, input);
150+
deepEqual(logger.mock.calls[0].arguments, [{ event: {} }]);
151+
deepEqual(logger.mock.calls[1].arguments, [
152+
{
153+
response: input,
154+
},
155+
]);
156+
});
157+
158+
test("It should log with Web Streams API using body ReadableStream", async (t) => {
159+
const input = "x".repeat(1024 * 1024);
160+
const logger = t.mock.fn();
161+
const handler = middy(
162+
async (event, context, { signal }) => {
163+
// Create a Web ReadableStream in the body
164+
const stream = new ReadableStream({
165+
start(controller) {
166+
controller.enqueue(input);
167+
controller.close();
168+
},
169+
});
170+
return {
171+
statusCode: 200,
172+
headers: {
173+
"Content-Type": "plain/text",
174+
},
175+
body: stream,
176+
};
177+
},
178+
{
179+
streamifyResponse: true,
180+
},
181+
).use(
182+
inputOutputLogger({
183+
logger,
184+
}),
185+
);
186+
187+
const event = {};
188+
let chunkResponse = "";
189+
const responseStream = createWritableStream((chunk) => {
190+
chunkResponse += chunk;
191+
});
192+
const response = await handler(event, responseStream, context);
193+
equal(response, undefined);
194+
equal(chunkResponse, input);
195+
deepEqual(logger.mock.calls[0].arguments, [{ event: {} }]);
196+
deepEqual(logger.mock.calls[1].arguments, [
197+
{
198+
response: {
199+
statusCode: 200,
200+
headers: {
201+
"Content-Type": "plain/text",
202+
},
203+
body: input,
204+
},
205+
},
206+
]);
207+
});
208+
119209
test("It should throw error when invalid logger", async (t) => {
120210
const logger = false;
121211

0 commit comments

Comments
 (0)