From 4e9654428c86124101a2a7f4091b7d39b2755e41 Mon Sep 17 00:00:00 2001 From: uasan Date: Mon, 18 Nov 2024 14:25:03 +0100 Subject: [PATCH 1/6] feat(csv-parse): implement TransformStream --- packages/csv-parse/lib/stream.d.ts | 11 +++++++++ packages/csv-parse/lib/stream.js | 39 +++++++++++++----------------- packages/csv-parse/package.json | 12 ++++++++- 3 files changed, 39 insertions(+), 23 deletions(-) create mode 100644 packages/csv-parse/lib/stream.d.ts diff --git a/packages/csv-parse/lib/stream.d.ts b/packages/csv-parse/lib/stream.d.ts new file mode 100644 index 00000000..f3b4068f --- /dev/null +++ b/packages/csv-parse/lib/stream.d.ts @@ -0,0 +1,11 @@ + +import { Options } from './index.js'; + +declare function parse(options?: Options): TransformStream; +// export default parse; +export { parse }; + +export { + CastingContext, CastingFunction, CastingDateFunction, + ColumnOption, Options, Info, CsvErrorCode, CsvError +} from './index.js'; diff --git a/packages/csv-parse/lib/stream.js b/packages/csv-parse/lib/stream.js index 8594af1b..fd9f79f5 100644 --- a/packages/csv-parse/lib/stream.js +++ b/packages/csv-parse/lib/stream.js @@ -3,30 +3,25 @@ import { transform } from "./api/index.js"; const parse = (opts) => { const api = transform(opts); + + let controller; + + const enqueue = (record) => { + controller.enqueue(record); + }; + const close = () => { + controller.close(); + }; + return new TransformStream({ - async transform(chunk, controller) { - api.parse( - chunk, - false, - (record) => { - controller.enqueue(record); - }, - () => { - controller.close(); - }, - ); + start(ctr) { + controller = ctr; + }, + transform(chunk) { + api.parse(chunk, false, enqueue, close); }, - async flush(controller) { - api.parse( - undefined, - true, - (record) => { - controller.enqueue(record); - }, - () => { - controller.close(); - }, - ); + flush() { + api.parse(undefined, true, enqueue, close); }, }); }; diff --git a/packages/csv-parse/package.json b/packages/csv-parse/package.json index 68b83988..5bb3f6b7 100644 --- a/packages/csv-parse/package.json +++ b/packages/csv-parse/package.json @@ -52,6 +52,16 @@ "default": "./dist/cjs/sync.cjs" } }, + "./stream": { + "import": { + "types": "./lib/stream.d.ts", + "default": "./lib/stream.js" + }, + "require": { + "types": "./dist/cjs/stream.d.cts", + "default": "./dist/cjs/stream.cjs" + } + }, "./browser/esm": { "types": "./dist/esm/index.d.ts", "default": "./dist/esm/index.js" @@ -143,4 +153,4 @@ ] } } -} +} \ No newline at end of file From 8a2d6239c038f02c2ff89cdfa03753ffa8169aa5 Mon Sep 17 00:00:00 2001 From: uasan Date: Mon, 18 Nov 2024 18:05:21 +0100 Subject: [PATCH 2/6] feat(csv-parse): add test api.web_stream and fix controller.terminate --- packages/csv-parse/dist/esm/stream.d.ts | 11 ++++++++ packages/csv-parse/lib/stream.js | 29 ++++++++++++--------- packages/csv-parse/test/api.web_stream.ts | 31 +++++++++++++++++++++++ 3 files changed, 59 insertions(+), 12 deletions(-) create mode 100644 packages/csv-parse/dist/esm/stream.d.ts create mode 100644 packages/csv-parse/test/api.web_stream.ts diff --git a/packages/csv-parse/dist/esm/stream.d.ts b/packages/csv-parse/dist/esm/stream.d.ts new file mode 100644 index 00000000..f3b4068f --- /dev/null +++ b/packages/csv-parse/dist/esm/stream.d.ts @@ -0,0 +1,11 @@ + +import { Options } from './index.js'; + +declare function parse(options?: Options): TransformStream; +// export default parse; +export { parse }; + +export { + CastingContext, CastingFunction, CastingDateFunction, + ColumnOption, Options, Info, CsvErrorCode, CsvError +} from './index.js'; diff --git a/packages/csv-parse/lib/stream.js b/packages/csv-parse/lib/stream.js index fd9f79f5..70b1273a 100644 --- a/packages/csv-parse/lib/stream.js +++ b/packages/csv-parse/lib/stream.js @@ -9,21 +9,26 @@ const parse = (opts) => { const enqueue = (record) => { controller.enqueue(record); }; - const close = () => { - controller.close(); + + const terminate = () => { + controller.terminate(); }; - return new TransformStream({ - start(ctr) { - controller = ctr; - }, - transform(chunk) { - api.parse(chunk, false, enqueue, close); - }, - flush() { - api.parse(undefined, true, enqueue, close); + return new TransformStream( + { + start(ctr) { + controller = ctr; + }, + transform(chunk) { + api.parse(chunk, false, enqueue, terminate); + }, + flush() { + api.parse(undefined, true, enqueue, terminate); + }, }, - }); + new CountQueuingStrategy({ highWaterMark: 1024 }), + new CountQueuingStrategy({ highWaterMark: 1024 }), + ); }; export { parse }; diff --git a/packages/csv-parse/test/api.web_stream.ts b/packages/csv-parse/test/api.web_stream.ts new file mode 100644 index 00000000..5f233b3f --- /dev/null +++ b/packages/csv-parse/test/api.web_stream.ts @@ -0,0 +1,31 @@ +import assert from 'assert'; +import {parse as parseStream} from '../lib/stream.js' + +describe('API Web Stream', () => { + + describe('stream/web/TransformStream', () => { + + it('simple parse', async () => { + const stream = parseStream(); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + await writer.write(Buffer.from("A,B,C\nD,E,F")); + await writer.close(); + + assert.deepStrictEqual(await reader.read(), { + done: false, + value: ['A', 'B', 'C'], + }); + assert.deepStrictEqual(await reader.read(), { + done: false, + value: ['D', 'E', 'F'], + }); + assert.deepStrictEqual(await reader.read(), { + done: true, + value: undefined, + }); + }) + }) +}) From 6888830198a39380b4dfc782d75f3d308dfc5549 Mon Sep 17 00:00:00 2001 From: uasan Date: Wed, 20 Nov 2024 01:20:18 +0100 Subject: [PATCH 3/6] feat(csv-parse): added errors handle to TransformStream --- packages/csv-parse/lib/stream.js | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/csv-parse/lib/stream.js b/packages/csv-parse/lib/stream.js index 70b1273a..a9ed71d2 100644 --- a/packages/csv-parse/lib/stream.js +++ b/packages/csv-parse/lib/stream.js @@ -1,4 +1,3 @@ -import { TransformStream } from "node:stream/web"; import { transform } from "./api/index.js"; const parse = (opts) => { @@ -20,10 +19,20 @@ const parse = (opts) => { controller = ctr; }, transform(chunk) { - api.parse(chunk, false, enqueue, terminate); + const error = api.parse(chunk, false, enqueue, terminate); + + if (error) { + controller.error(error); + throw error; + } }, flush() { - api.parse(undefined, true, enqueue, terminate); + const error = api.parse(undefined, true, enqueue, terminate); + + if (error) { + controller.error(error); + throw error; + } }, }, new CountQueuingStrategy({ highWaterMark: 1024 }), From d33a61a8a575f8a48545fc7e20fc56221ad339dc Mon Sep 17 00:00:00 2001 From: David Worms Date: Wed, 20 Nov 2024 10:14:52 +0100 Subject: [PATCH 4/6] test(csv-parse): error thrown by webstream --- packages/csv-parse/test/api.web_stream.ts | 28 +++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/packages/csv-parse/test/api.web_stream.ts b/packages/csv-parse/test/api.web_stream.ts index 5f233b3f..2337f481 100644 --- a/packages/csv-parse/test/api.web_stream.ts +++ b/packages/csv-parse/test/api.web_stream.ts @@ -1,5 +1,6 @@ -import assert from 'assert'; +import 'should' import {parse as parseStream} from '../lib/stream.js' +import { CsvError } from '../lib/index.js' describe('API Web Stream', () => { @@ -7,25 +8,38 @@ describe('API Web Stream', () => { it('simple parse', async () => { const stream = parseStream(); - const writer = stream.writable.getWriter(); const reader = stream.readable.getReader(); - await writer.write(Buffer.from("A,B,C\nD,E,F")); await writer.close(); - - assert.deepStrictEqual(await reader.read(), { + await reader.read().should.finally.eql({ done: false, value: ['A', 'B', 'C'], }); - assert.deepStrictEqual(await reader.read(), { + await reader.read().should.finally.eql({ done: false, value: ['D', 'E', 'F'], }); - assert.deepStrictEqual(await reader.read(), { + await reader.read().should.finally.eql({ done: true, value: undefined, }); }) + + it("cat error parse", async function () { + const stream = parseStream(); + const writer = stream.writable.getWriter(); + try { + await writer.write(Buffer.from("A,B,C\nD,E")); + await writer.close(); + throw Error("Shall not be called"); + } catch (err) { + if (!(err instanceof CsvError)) { + throw Error("Invalid error type"); + } + err.code.should.eql("CSV_RECORD_INCONSISTENT_FIELDS_LENGTH"); + } + }); + }) }) From ecaf9465be7ebe55696f0a56809ababebb9cd8ec Mon Sep 17 00:00:00 2001 From: David Worms Date: Wed, 20 Nov 2024 10:39:17 +0100 Subject: [PATCH 5/6] build(csv-parse): exclude web_stream ts test from ci in node 14 --- packages/csv-parse/package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/csv-parse/package.json b/packages/csv-parse/package.json index 5bb3f6b7..5e6fd678 100644 --- a/packages/csv-parse/package.json +++ b/packages/csv-parse/package.json @@ -133,7 +133,7 @@ "preversion": "npm run build && git add dist", "pretest": "npm run build", "test": "mocha 'test/**/*.{coffee,ts}'", - "test:legacy": "mocha --ignore test/api.web_stream.coffee --ignore test/api.stream.finished.coffee --ignore test/api.stream.iterator.coffee --loader=./test/loaders/legacy/all.js 'test/**/*.{coffee,ts}'" + "test:legacy": "mocha --ignore test/api.web_stream.coffee --ignore test/api.web_stream.ts --ignore test/api.stream.finished.coffee --ignore test/api.stream.iterator.coffee --loader=./test/loaders/legacy/all.js 'test/**/*.{coffee,ts}'" }, "type": "module", "types": "dist/esm/index.d.ts", @@ -153,4 +153,4 @@ ] } } -} \ No newline at end of file +} From 1f763d02aab0388d4fffe8c5c8feb227d9ae4b77 Mon Sep 17 00:00:00 2001 From: David Worms Date: Wed, 20 Nov 2024 10:40:47 +0100 Subject: [PATCH 6/6] fix(csv-parse): satisfy test and re-habilitate node stream import --- packages/csv-parse/lib/stream.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/csv-parse/lib/stream.js b/packages/csv-parse/lib/stream.js index a9ed71d2..d50d3e10 100644 --- a/packages/csv-parse/lib/stream.js +++ b/packages/csv-parse/lib/stream.js @@ -1,18 +1,15 @@ +import { TransformStream, CountQueuingStrategy } from "node:stream/web"; import { transform } from "./api/index.js"; const parse = (opts) => { const api = transform(opts); - let controller; - const enqueue = (record) => { controller.enqueue(record); }; - const terminate = () => { controller.terminate(); }; - return new TransformStream( { start(ctr) {