Skip to content

Commit 7ee7b9b

Browse files
committed
feat: optimize buffer management for duplicate key handling
1 parent 51d3a75 commit 7ee7b9b

File tree

5 files changed

+149
-63
lines changed

5 files changed

+149
-63
lines changed

README.md

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,6 @@ const stats = await diff({
331331
newSource: './tests/b.csv',
332332
keys: ['id'],
333333
duplicateKeyHandling: 'keepFirstRow', // or 'keepLastRow'
334-
duplicateRowBufferSize: 2000,
335334
}).to('console');
336335
console.log(stats);
337336
```
@@ -349,6 +348,8 @@ const stats = await diff({
349348
console.log(stats);
350349
```
351350
351+
Note that you can specify the size of the buffer if you know that it cannot exceed this quantity, otherwise you can enable the **duplicateRowBufferOverflow** option,
352+
which will remove the first entries when it exceeds the allocated capacity, to avoid a failure.
352353
353354
### Order 2 CSV files and diff them on the console
354355
@@ -580,16 +581,17 @@ sortDirection| no | ASC | specifies if the column is sorted in ascen
580581
581582
### Differ options
582583
583-
Name |Required|Default value|Description
584-
----------------------|--------|-------------|-----------
585-
oldSource | yes | | either a string filename, a URL or a SourceOptions
586-
newSource | yes | | either a string filename, a URL or a SourceOptions
587-
keys | yes | | the list of columns that form the primary key. This is required for comparing the rows. A key can be a string name or a {ColumnDefinition}
588-
includedColumns | no | | the list of columns to keep from the input sources. If not specified, all columns are selected.
589-
excludedColumns | no | | the list of columns to exclude from the input sources.
590-
rowComparer | no | | specifies a custom row comparer.
591-
duplicateKeyHandling |no | fail | specifies how to handle duplicate rows in a source. It will fail by default and throw a UniqueKeyViolationError exception. But you can ignore, keep the first or last row, or even provide your own function that will receive the duplicates and select the best candidate.
592-
duplicateRowBufferSize|no | 1000 | specifies the maximum size of the buffer used to accumulate duplicate rows.
584+
Name |Required|Default value|Description
585+
--------------------------|--------|-------------|-----------
586+
oldSource | yes | | either a string filename, a URL or a SourceOptions
587+
newSource | yes | | either a string filename, a URL or a SourceOptions
588+
keys | yes | | the list of columns that form the primary key. This is required for comparing the rows. A key can be a string name or a {ColumnDefinition}
589+
includedColumns | no | | the list of columns to keep from the input sources. If not specified, all columns are selected.
590+
excludedColumns | no | | the list of columns to exclude from the input sources.
591+
rowComparer | no | | specifies a custom row comparer.
592+
duplicateKeyHandling |no | fail | specifies how to handle duplicate rows in a source. It will fail by default and throw a UniqueKeyViolationError exception. But you can ignore, keep the first or last row, or even provide your own function that will receive the duplicates and select the best candidate.
593+
duplicateRowBufferSize |no | 1000 | specifies the maximum size of the buffer used to accumulate duplicate rows.
594+
duplicateRowBufferOverflow|no | false | specifies if we can remove the first entries of the buffer to continue adding new duplicate entries when reaching maximum capacity, to avoir throwing an error and halting the process.
593595
594596
### diff function
595597

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "tabular-data-differ",
3-
"version": "1.1.2",
3+
"version": "1.1.3",
44
"description": "A very efficient library for diffing two sorted streams of tabular data, such as CSV files.",
55
"keywords": [
66
"table",

src/differ.test.ts

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,73 @@ describe('differ', () => {
328328
[ '3', 'dave bis', '444' ]
329329
]);
330330
});
331+
test('should detect duplicate keys and call aggregate function, with buffer overflow', async () => {
332+
const dups = [];
333+
for (let i = 0; i < 100; i++) {
334+
dups.push(`3,dave bis${i},444`);
335+
}
336+
let duplicateRows: Row[] = [];
337+
const duplicateKeyHandler: DuplicateKeyHandler = (rows) => {
338+
if (duplicateRows.length === 0) {
339+
duplicateRows = rows;
340+
}
341+
return rows[rows.length - 1];
342+
};
343+
const writer = await diffStrings({
344+
oldLines: [
345+
'ID,NAME,AGE',
346+
'1,john,33',
347+
'2,rachel,22',
348+
'3,dave,44',
349+
...dups,
350+
'4,noemie,11',
351+
],
352+
newLines: [
353+
'ID,NAME,AGE',
354+
'1,john,33',
355+
'2,rachel,22',
356+
'3,dave,44',
357+
],
358+
keys: ['ID'],
359+
duplicateKeyHandling: duplicateKeyHandler,
360+
duplicateRowBufferOverflow: true,
361+
duplicateRowBufferSize: 10,
362+
keepSameRows: true,
363+
});
364+
expect(writer.diffs).toEqual([
365+
{
366+
delta: 0,
367+
status: 'same',
368+
oldRow: [ '1', 'john', '33' ],
369+
newRow: [ '1', 'john', '33' ]
370+
},
371+
{
372+
delta: 0,
373+
status: 'same',
374+
oldRow: [ '2', 'rachel', '22' ],
375+
newRow: [ '2', 'rachel', '22' ]
376+
},
377+
{
378+
delta: 0,
379+
status: 'modified',
380+
oldRow: [ '3', 'dave bis99', '444' ],
381+
newRow: [ '3', 'dave', '44' ]
382+
},
383+
{ delta: -1, status: 'deleted', oldRow: [ '4', 'noemie', '11' ] }
384+
]);
385+
expect(duplicateRows).toEqual([
386+
[ '3', 'dave bis90', '444' ],
387+
[ '3', 'dave bis91', '444' ],
388+
[ '3', 'dave bis92', '444' ],
389+
[ '3', 'dave bis93', '444' ],
390+
[ '3', 'dave bis94', '444' ],
391+
[ '3', 'dave bis95', '444' ],
392+
[ '3', 'dave bis96', '444' ],
393+
[ '3', 'dave bis97', '444' ],
394+
[ '3', 'dave bis98', '444' ],
395+
[ '3', 'dave bis99', '444' ]
396+
]);
397+
});
331398
test('should detect duplicate keys and throw an error when the buffer exceeds the limit', async () => {
332399
const dups = [];
333400
for (let i = 0; i < 10; i++) {
@@ -349,7 +416,7 @@ describe('differ', () => {
349416
'3,dave,44',
350417
],
351418
keys: ['ID'],
352-
duplicateKeyHandling: 'keepLastRow',
419+
duplicateKeyHandling: (rows) => rows[0],
353420
duplicateRowBufferSize: 5,
354421
keepSameRows: true,
355422
})).rejects.toThrowError('Too many duplicate rows');

src/differ.ts

Lines changed: 65 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,21 @@ export interface DifferOptions {
210210
duplicateKeyHandling?: DuplicateKeyHandling;
211211
/**
212212
* specifies the maximum size of the buffer used to accumulate duplicate rows.
213+
* Note that the buffer size matters only when you provide a custom function to the duplicateKeyHandling, since it will receive the accumulated duplicates
214+
* as an input parameter.
213215
* @default 1000
214216
* @see duplicateKeyHandling
215217
*/
216218
duplicateRowBufferSize?: number;
219+
/**
220+
* specifies if we can remove the first entries of the buffer to continue adding new duplicate entries when reaching maximum capacity,
221+
* to avoir throwing an error and halting the process.
222+
* Note that the buffer size matters only when you provide a custom function to the duplicateKeyHandling, since it will receive the accumulated duplicates
223+
* as an input parameter.
224+
* @default false
225+
* @see duplicateRowBufferSize
226+
*/
227+
duplicateRowBufferOverflow?: boolean;
217228
}
218229

219230
/**
@@ -373,7 +384,7 @@ export class DifferContext {
373384
this.newSource = new BufferedFormatReader(createSource(options.newSource));
374385
this.comparer = options.rowComparer ?? defaultRowComparer;
375386
this.duplicateKeyHandling = options.duplicateKeyHandling ?? 'fail';
376-
this.duplicateRowBufferSize = options.duplicateRowBufferSize ?? 1000;
387+
this.duplicateRowBufferSize = Math.max(5, options.duplicateRowBufferSize ?? 1000);
377388
}
378389

379390
/**
@@ -598,12 +609,63 @@ export class DifferContext {
598609
return result;
599610
}
600611

612+
async getNextRow(source: BufferedFormatReader): Promise<Row | undefined> {
613+
const row = await source.readRow();
614+
if (!row) {
615+
return row;
616+
}
617+
if (this.duplicateKeyHandling === 'fail') {
618+
// Note that it will be further processed in ensureRowsAreInAscendingOrder
619+
return row;
620+
}
621+
const nextRow = await source.peekRow();
622+
if (!nextRow) {
623+
return row;
624+
}
625+
let isDuplicate = this.comparer(this.keys, nextRow, row) === 0;
626+
if (isDuplicate) {
627+
const duplicateRows: Row[] = [];
628+
duplicateRows.push(row);
629+
while(isDuplicate) {
630+
const duplicateRow = await source.readRow();
631+
if (duplicateRow) {
632+
if (this.duplicateKeyHandling !== 'keepFirstRow') {
633+
// we don't need to accumulate duplicate rows when we just have to return the first row!
634+
duplicateRows.push(duplicateRow);
635+
}
636+
if (this.duplicateKeyHandling === 'keepLastRow') {
637+
// we don't need to accumulate the previous rows when we just have to return the last row!
638+
duplicateRows.shift();
639+
}
640+
if (duplicateRows.length > this.duplicateRowBufferSize) {
641+
if (this.options.duplicateRowBufferOverflow) {
642+
// remove the first entry when we can overflow
643+
duplicateRows.shift();
644+
} else {
645+
throw new Error('Too many duplicate rows');
646+
}
647+
}
648+
}
649+
const nextRow = await source.peekRow();
650+
isDuplicate = !!nextRow && this.comparer(this.keys, nextRow, row) === 0;
651+
}
652+
if (this.duplicateKeyHandling === 'keepFirstRow') {
653+
return duplicateRows[0];
654+
}
655+
if (this.duplicateKeyHandling === 'keepLastRow') {
656+
return duplicateRows[duplicateRows.length-1];
657+
}
658+
return this.duplicateKeyHandling(duplicateRows);
659+
}
660+
return row;
661+
}
662+
601663
private getNextOldRow(): Promise<Row | undefined> {
602-
return getNextRow(this.oldSource, this.duplicateKeyHandling, this.comparer, this.keys, this.duplicateRowBufferSize);
664+
return this.getNextRow(this.oldSource);
603665
}
604666

605667
private getNextNewRow(): Promise<Row | undefined> {
606-
return getNextRow(this.newSource, this.duplicateKeyHandling, this.comparer, this.keys, this.duplicateRowBufferSize);
668+
return this.getNextRow(this.newSource);
607669
}
608670

609671
private async getNextPair():Promise<RowPair> {
@@ -674,48 +736,3 @@ export function sameArrays(a: string[], b: string[]) {
674736
}
675737
return true;
676738
}
677-
678-
async function getNextRow(
679-
source: BufferedFormatReader,
680-
duplicateKeyHandling: DuplicateKeyHandling,
681-
comparer: RowComparer,
682-
keys: Column[],
683-
duplicateRowBufferSize: number,
684-
): Promise<Row | undefined> {
685-
const row = await source.readRow();
686-
if (!row) {
687-
return row;
688-
}
689-
if (duplicateKeyHandling === 'fail') {
690-
// Note that it will be further processed in ensureRowsAreInAscendingOrder
691-
return row;
692-
}
693-
const nextRow = await source.peekRow();
694-
if (!nextRow) {
695-
return row;
696-
}
697-
let isDuplicate = comparer(keys, nextRow, row) === 0;
698-
if (isDuplicate) {
699-
const duplicateRows: Row[] = [];
700-
duplicateRows.push(row);
701-
while(isDuplicate) {
702-
const duplicateRow = await source.readRow();
703-
if (duplicateRow) {
704-
duplicateRows.push(duplicateRow);
705-
if (duplicateRows.length > duplicateRowBufferSize) {
706-
throw new Error('Too many duplicate rows');
707-
}
708-
}
709-
const nextRow = await source.peekRow();
710-
isDuplicate = !!nextRow && comparer(keys, nextRow, row) === 0;
711-
}
712-
if (duplicateKeyHandling === 'keepFirstRow') {
713-
return duplicateRows[0];
714-
}
715-
if (duplicateKeyHandling === 'keepLastRow') {
716-
return duplicateRows[duplicateRows.length-1];
717-
}
718-
return duplicateKeyHandling(duplicateRows);
719-
}
720-
return row;
721-
}

0 commit comments

Comments
 (0)