Skip to content

Commit c7a1025

Browse files
authored
Merge pull request #12 from palcarazm/fix/backoff
fix: backoff pressure
2 parents e60d751 + 1a2c658 commit c7a1025

File tree

4 files changed

+34
-31
lines changed

4 files changed

+34
-31
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "batchjs-data",
3-
"version": "1.0.0",
3+
"version": "1.0.1",
44
"author": {
55
"name": "Pablo Alcaraz Martínez",
66
"url": "https://github.com/palcarazm/"
@@ -79,7 +79,7 @@
7979
"sqlite3": "^5.1.7"
8080
},
8181
"dependencies": {
82-
"batchjs": "^1.1.0"
82+
"batchjs": "^1.1.3"
8383
},
8484
"commitlint": {
8585
"extends": "@commitlint/config-conventional"

src/common/classes/AbstractBatchEntityReaderStream.ts

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,24 +38,26 @@ export abstract class AbstractBatchEntityReaderStream<T> extends ObjectReadable
3838
* If an error occurs while reading data, it emits an error event to the stream.
3939
*
4040
* @param {number} [size] - The size parameter for controlling the read operation.
41-
* @returns {Promise<void>} A promise that resolves when the data has been read and pushed to the consumer stream.
4241
*/
43-
async _read(size: number): Promise<void> {
42+
_read(size: number): void {
4443
if (this.reading) return;
44+
4545
this.reading = true;
46-
try {
47-
const entities: T[] = await this.fetch(Math.min(size, this.batchSize));
48-
if (entities.length === 0) {
49-
this.push(null);
50-
}else{
51-
this.buffer.push(...entities);
52-
await this._flush();
53-
}
54-
} catch (error) {
55-
this.emit("error", error as Error);
56-
}finally{
57-
this.reading = false;
58-
}
46+
47+
this.fetch(Math.min(size, this.batchSize))
48+
.then((entities) => {
49+
if (entities.length === 0) {
50+
this.push(null);
51+
} else {
52+
this.buffer.push(...entities);
53+
this._flush().finally(() => {
54+
this.reading = false;
55+
});
56+
}
57+
})
58+
.catch((error) => {
59+
this.emit("error", error);
60+
});
5961
}
6062

6163
/**
@@ -65,12 +67,12 @@ export abstract class AbstractBatchEntityReaderStream<T> extends ObjectReadable
6567
* @private
6668
* @returns {Promise<void>} A promise that resolves when the buffer is flushed.
6769
*/
68-
private async _flush():Promise<void>{
70+
private _flush():Promise<void>{
6971
while (this.buffer.length > 0) {
7072
const chunk = this.buffer.shift() as T;
7173
if (!this.push(chunk)) {
72-
this.buffer.unshift(chunk);
73-
await new Promise<void>((resolve) => this.once("drain", resolve));
74+
this.once("drain", () => this._flush());
75+
return Promise.resolve();
7476
}
7577
}
7678
return Promise.resolve();

src/common/classes/AbstractBatchEntityWriterStream.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,17 @@ export abstract class AbstractBatchEntityWriterStream<T> extends ObjectWritable
3535
* @param {T} chunk - The data chunk to write to the stream.
3636
* @param {BufferEncoding} encoding - The encoding of the data.
3737
* @param {WriteCallback} callback - The callback function to be executed after writing the data.
38-
* @return {Promise<void>} This function does not return anything.
3938
*/
40-
async _write(chunk: T, encoding: BufferEncoding, callback: WriteCallback): Promise<void> {
41-
try {
39+
_write(chunk: T, encoding: BufferEncoding, callback: WriteCallback): void {
4240
this.buffer.push(chunk);
4341
if (this.buffer.length >= this.batchSize) {
44-
await this._flush();
42+
this._flush()
43+
.then(() => callback())
44+
.catch((error) => callback(error));
45+
} else {
46+
callback();
4547
}
46-
callback();
47-
} catch (error) {
48-
callback(error as Error);
49-
}
50-
}
48+
}
5149

5250
/**
5351
* Finalizes the stream by pushing remaining data batches, handling errors,

test/common/classes/AbstractBatchEntityReaderStream.test.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ describe("AbstractBatchEntityReaderStream", () => {
6363
result.push(chunk);
6464

6565
if (result.length === 2) {
66-
spy.mockImplementationOnce(() => false);
66+
spy.mockImplementationOnce((data) => {
67+
reader.emit('data',data);
68+
return false;
69+
});
6770
setTimeout(()=>{
6871
reader.emit("drain");
6972
},50);
@@ -72,7 +75,7 @@ describe("AbstractBatchEntityReaderStream", () => {
7275

7376
reader.on("end", () => {
7477
expect(result).toEqual(data);
75-
expect(spy).toHaveBeenCalledTimes(data.length + 2);
78+
expect(spy).toHaveBeenCalledTimes(data.length + 1);
7679
done();
7780
});
7881

0 commit comments

Comments
 (0)