Skip to content

Commit 5fd3b4c

Browse files
committed
test: bypass jsforce request buffering
1 parent 9bc987e commit 5fd3b4c

4 files changed

Lines changed: 197 additions & 25 deletions

File tree

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@
135135
"csv-stringify": "^6.6.0",
136136
"form-data": "^4.0.5",
137137
"terminal-link": "^3.0.0",
138+
"undici": "^7.22.0",
138139
"zod": "^4.3.6"
139140
},
140141
"devDependencies": {

src/bulkUtils.ts

Lines changed: 87 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
import { Transform, Readable } from 'node:stream';
17+
import { Transform, Readable, TransformCallback } from 'node:stream';
1818
import { createInterface } from 'node:readline';
1919
import { pipeline } from 'node:stream/promises';
2020
import * as fs from 'node:fs';
2121
import { EOL } from 'node:os';
22-
import { HttpApi } from '@jsforce/jsforce-node/lib/http-api.js';
22+
import { fetch } from 'undici';
2323
import { HttpResponse } from '@jsforce/jsforce-node';
2424
import {
2525
IngestJobV2Results,
@@ -75,28 +75,91 @@ export enum ColumnDelimiter {
7575

7676
export type ColumnDelimiterKeys = keyof typeof ColumnDelimiter;
7777

78-
async function bulkRequest(conn: Connection, url: string): Promise<{ body: string; headers: HttpResponse['headers'] }> {
79-
const httpApi = new HttpApi(conn, {
80-
responseType: 'text/plain', // this ensures jsforce doesn't try parsing the body
81-
});
78+
/**
79+
* Transform stream that skips the first line of CSV data (the header row).
80+
* Used when processing subsequent bulk result pages to avoid duplicate headers.
81+
*/
82+
export class SkipFirstLineTransform extends Transform {
83+
private firstLineSkipped = false;
84+
private buffer = '';
8285

83-
let headers: HttpResponse['headers'] | undefined;
86+
public constructor() {
87+
super();
88+
}
8489

85-
httpApi.on('response', (response: HttpResponse) => {
86-
headers = response.headers;
87-
});
90+
public _transform(chunk: Buffer, _encoding: BufferEncoding, callback: TransformCallback): void {
91+
if (this.firstLineSkipped) {
92+
// After first line is skipped, pass through all subsequent data
93+
callback(null, chunk);
94+
return;
95+
}
96+
97+
// Buffer incoming data until we find the first newline
98+
this.buffer += chunk.toString('utf8');
99+
100+
const newlineIndex = this.buffer.indexOf('\n');
101+
102+
if (newlineIndex === -1) {
103+
// No newline yet, keep buffering
104+
callback();
105+
return;
106+
}
107+
108+
// Found the newline, skip everything up to and including it
109+
const remainingData = this.buffer.slice(newlineIndex + 1);
110+
this.firstLineSkipped = true;
111+
this.buffer = ''; // Clear buffer to free memory
112+
113+
callback(null, Buffer.from(remainingData, 'utf8'));
114+
}
88115

89-
const body = await httpApi.request<string>({
90-
url: conn.normalizeUrl(url),
116+
public _flush(callback: TransformCallback): void {
117+
// If we reach the end without finding a newline, clear buffer and finish
118+
this.buffer = '';
119+
callback();
120+
}
121+
}
122+
123+
async function bulkRequest(
124+
conn: Connection,
125+
url: string
126+
): Promise<{ stream: Readable; headers: HttpResponse['headers'] }> {
127+
// Bypass jsforce entirely and use undici fetch to avoid any buffering.
128+
// jsforce's Transport.httpRequest() adds a 'complete' listener which triggers readAll() buffering.
129+
// Using undici fetch directly gives us the raw response stream without any intermediate buffering.
130+
131+
const normalizedUrl = conn.normalizeUrl(url);
132+
133+
// Prepare request headers with authorization
134+
const headers: { [name: string]: string } = {
135+
'content-Type': 'text/csv',
136+
};
137+
138+
if (conn.accessToken) {
139+
headers.Authorization = `Bearer ${conn.accessToken}`;
140+
}
141+
142+
const response = await fetch(normalizedUrl, {
91143
method: 'GET',
144+
headers,
92145
});
93146

94-
if (!headers) throw new Error('failed to get HTTP headers for bulk query');
147+
if (!response.ok) {
148+
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
149+
}
95150

96-
return {
97-
body,
98-
headers,
99-
};
151+
if (!response.body) {
152+
throw new Error('No body was returned');
153+
}
154+
const stream = Readable.fromWeb(response.body);
155+
156+
// Extract headers in the format jsforce expects
157+
const responseHeaders: HttpResponse['headers'] = {};
158+
response.headers.forEach((value: string, key: string) => {
159+
responseHeaders[key] = value;
160+
});
161+
162+
return { stream, headers: responseHeaders };
100163
}
101164

102165
export async function exportRecords(
@@ -124,6 +187,9 @@ export async function exportRecords(
124187

125188
let recordsWritten = 0;
126189

190+
// refresh here because `bulkRequest` uses undici for fetching results.
191+
await conn.refreshAuth();
192+
127193
while (locator !== 'null') {
128194
// we can't parallelize this because we:
129195
// 1. need to get 1 batch to know the locator for the next one
@@ -151,7 +217,7 @@ export async function exportRecords(
151217

152218
// eslint-disable-next-line no-await-in-loop
153219
await pipeline(
154-
Readable.from(res.body),
220+
res.stream,
155221
new csvParse({ columns: true, delimiter: ColumnDelimiter[outputInfo.columnDelimiter] }),
156222
new Transform({
157223
objectMode: true,
@@ -173,18 +239,15 @@ export async function exportRecords(
173239
await pipeline(
174240
locator
175241
? [
176-
// Skip the 1st row (CSV header) by finding the index of the first `LF`
177-
// occurence and move the position 1 char ahead.
178-
//
179-
// CSVs using `CRLF` are still handled correctly because `CR` and `LF` are different chars in the string.
180-
Readable.from(res.body.slice(res.body.indexOf('\n') + 1)),
242+
res.stream,
243+
new SkipFirstLineTransform(),
181244
fs.createWriteStream(outputInfo.filePath, {
182245
// Open file for appending. The file is created if it does not exist.
183246
// https://nodejs.org/api/fs.html#file-system-flags
184247
flags: 'a', // append mode
185248
}),
186249
]
187-
: [Readable.from(res.body), fs.createWriteStream(outputInfo.filePath)]
250+
: [res.stream, fs.createWriteStream(outputInfo.filePath)]
188251
);
189252
}
190253

test/bulkUtils.test.ts

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17+
import { Readable } from 'node:stream';
18+
import { pipeline } from 'node:stream/promises';
19+
1720
import { expect } from 'chai';
1821

19-
import { detectDelimiter } from '../src/bulkUtils.js';
22+
import { detectDelimiter, SkipFirstLineTransform } from '../src/bulkUtils.js';
2023

2124
describe('bulkUtils', () => {
2225
describe('csv', () => {
@@ -31,4 +34,104 @@ describe('bulkUtils', () => {
3134
expect(await detectDelimiter('./test/test-files/csv/tab.csv')).to.equal('TAB');
3235
});
3336
});
37+
38+
describe.only('SkipFirstLineTransform', () => {
39+
async function streamToString(readable: Readable): Promise<string> {
40+
const chunks: Buffer[] = [];
41+
for await (const chunk of readable) {
42+
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string));
43+
}
44+
return Buffer.concat(chunks).toString('utf8');
45+
}
46+
47+
it('skips first line with LF endings', async () => {
48+
const input = 'Header1,Header2,Header3\nRow1Col1,Row1Col2,Row1Col3\nRow2Col1,Row2Col2,Row2Col3\n';
49+
const expected = 'Row1Col1,Row1Col2,Row1Col3\nRow2Col1,Row2Col2,Row2Col3\n';
50+
51+
const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform()));
52+
53+
expect(result).to.equal(expected);
54+
});
55+
56+
it('skips first line with CRLF endings', async () => {
57+
const input = 'Header1,Header2,Header3\r\nRow1Col1,Row1Col2,Row1Col3\r\nRow2Col1,Row2Col2,Row2Col3\r\n';
58+
const expected = 'Row1Col1,Row1Col2,Row1Col3\r\nRow2Col1,Row2Col2,Row2Col3\r\n';
59+
60+
const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform()));
61+
62+
expect(result).to.equal(expected);
63+
});
64+
65+
it('handles header split across chunks', async () => {
66+
// Simulate a stream where the header is split across multiple chunks
67+
const chunk1 = 'Header1,Head';
68+
const chunk2 = 'er2,Header3\nRow1Col1,Row1Col2,Row1Col3\n';
69+
const expected = 'Row1Col1,Row1Col2,Row1Col3\n';
70+
71+
const input = Readable.from([chunk1, chunk2]);
72+
const result = await streamToString(input.pipe(new SkipFirstLineTransform()));
73+
74+
expect(result).to.equal(expected);
75+
});
76+
77+
it('handles empty stream after header', async () => {
78+
const input = 'Header1,Header2,Header3\n';
79+
const expected = '';
80+
81+
const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform()));
82+
83+
expect(result).to.equal(expected);
84+
});
85+
86+
it('handles single-line input without newline', async () => {
87+
// Edge case: header with no newline at all
88+
const input = 'Header1,Header2,Header3';
89+
const expected = '';
90+
91+
const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform()));
92+
93+
expect(result).to.equal(expected);
94+
});
95+
96+
it('handles multi-byte UTF-8 characters in header', async () => {
97+
const input = 'Header1,Hëàdér2,Header3\nRow1Col1,Row1Col2,Row1Col3\n';
98+
const expected = 'Row1Col1,Row1Col2,Row1Col3\n';
99+
100+
const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform()));
101+
102+
expect(result).to.equal(expected);
103+
});
104+
105+
it('handles very long header line', async () => {
106+
// Create a header with many columns
107+
const headerCols = Array.from({ length: 100 }, (_, i) => `Header${i}`).join(',');
108+
const dataCols = Array.from({ length: 100 }, (_, i) => `Data${i}`).join(',');
109+
const input = `${headerCols}\n${dataCols}\n`;
110+
const expected = `${dataCols}\n`;
111+
112+
const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform()));
113+
114+
expect(result).to.equal(expected);
115+
});
116+
117+
it('passes through data correctly in pipeline', async () => {
118+
const input = 'Id,Name,Email\n1,John,john@example.com\n2,Jane,jane@example.com\n';
119+
const expected = '1,John,john@example.com\n2,Jane,jane@example.com\n';
120+
121+
const chunks: string[] = [];
122+
await pipeline(
123+
Readable.from(input),
124+
new SkipFirstLineTransform(),
125+
async function* (source: AsyncIterable<Buffer>) {
126+
for await (const chunk of source) {
127+
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string);
128+
chunks.push(buffer.toString('utf8'));
129+
yield chunk;
130+
}
131+
}
132+
);
133+
134+
expect(chunks.join('')).to.equal(expected);
135+
});
136+
});
34137
});

yarn.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7553,6 +7553,11 @@ undici-types@~6.19.2:
75537553
resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-6.19.6.tgz#e218c3df0987f4c0e0008ca00d6b6472d9b89b36"
75547554
integrity sha512-e/vggGopEfTKSvj4ihnOLTsqhrKRN3LeO6qSN/GxohhuRv8qH9bNQ4B8W7e/vFL+0XTnmHPB4/kegunZGA4Org==
75557555

7556+
undici@^7.22.0:
7557+
version "7.22.0"
7558+
resolved "https://registry.yarnpkg.com/undici/-/undici-7.22.0.tgz#7a82590a5908e504a47d85c60b0f89ca14240e60"
7559+
integrity sha512-RqslV2Us5BrllB+JeiZnK4peryVTndy9Dnqq62S3yYRRTj0tFQCwEniUy2167skdGOy3vqRzEvl1Dm4sV2ReDg==
7560+
75567561
unicorn-magic@^0.3.0:
75577562
version "0.3.0"
75587563
resolved "https://registry.yarnpkg.com/unicorn-magic/-/unicorn-magic-0.3.0.tgz#4efd45c85a69e0dd576d25532fbfa22aa5c8a104"

0 commit comments

Comments
 (0)