Skip to content

Commit 063b39b

Browse files
committed
equalpipelines irrespective of batch size
1 parent b0842ca commit 063b39b

File tree

1 file changed

+108
-37
lines changed

1 file changed

+108
-37
lines changed

pkg/engine/executor/pipeline_test.go

Lines changed: 108 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,61 +9,132 @@ import (
99
)
1010

1111
func equalPipelines(t *testing.T, p1, p2 Pipeline) {
12-
for {
13-
leftErr := p1.Read()
14-
rightErr := p2.Read()
12+
// Track batch position for each pipeline
13+
var (
14+
batch1, batch2 arrow.Record
15+
pos1, pos2 int64
16+
totalRows int64
17+
)
18+
19+
// Read initial batches
20+
err1 := p1.Read()
21+
require.NoError(t, err1)
22+
batch1, _ = p1.Value()
23+
require.NotNil(t, batch1, "batch from first pipeline is nil")
24+
25+
err2 := p2.Read()
26+
require.NoError(t, err2)
27+
batch2, _ = p2.Value()
28+
require.NotNil(t, batch2, "batch from second pipeline is nil")
1529

16-
left, _ := p1.Value()
17-
right, _ := p2.Value()
30+
checkSchema(t, batch1, batch2)
1831

19-
if leftErr != nil {
20-
require.Equal(t, leftErr, rightErr, "error mismatch")
32+
// Compare row by row until both pipelines are exhausted
33+
for {
34+
// Both pipelines are exhausted
35+
if batch1 == nil && batch2 == nil {
2136
break
2237
}
2338

24-
equalBatch(t, left, right)
39+
// One pipeline is exhausted but the other isn't - they should be equal
40+
if (batch1 == nil && batch2 != nil) || (batch1 != nil && batch2 == nil) {
41+
require.Fail(t, "pipelines have different number of rows")
42+
break
43+
}
44+
45+
// Compare current row
46+
compareRow(t, batch1, pos1, batch2, pos2, totalRows)
47+
totalRows++
48+
49+
// Advance positions
50+
pos1++
51+
pos2++
52+
53+
// If we've consumed all rows in batch1, get the next batch
54+
if pos1 >= batch1.NumRows() {
55+
pos1 = 0
56+
err1 = p1.Read()
57+
if err1 == nil {
58+
batch1, _ = p1.Value()
59+
require.NotNil(t, batch1, "batch from first pipeline is nil")
60+
} else {
61+
batch1 = nil // Mark as exhausted
62+
}
63+
}
64+
65+
// If we've consumed all rows in batch2, get the next batch
66+
if pos2 >= batch2.NumRows() {
67+
pos2 = 0
68+
err2 = p2.Read()
69+
if err2 == nil {
70+
batch2, _ = p2.Value()
71+
require.NotNil(t, batch2, "batch from second pipeline is nil")
72+
} else {
73+
batch2 = nil // Mark as exhausted
74+
}
75+
}
2576
}
2677
}
2778

28-
func equalBatch(t *testing.T, b1, b2 arrow.Record) {
29-
// Check for nil records
30-
require.NotNil(t, b1, "first record is nil")
31-
require.NotNil(t, b2, "second record is nil")
32-
33-
// rows
34-
require.Equal(t, b1.NumRows(), b2.NumRows(), "number of rows mismatch")
35-
// columns
36-
require.Equal(t, b1.NumCols(), b2.NumCols(), "number of columns mismatch")
37-
38-
for i := range b1.Columns() {
39-
// column names
40-
require.Equal(t, b1.ColumnName(i), b2.ColumnName(i), "column name mismatch")
41-
// column types
42-
require.Equal(t, b1.Column(i).DataType(), b2.Column(i).DataType(), "column type mismatch")
43-
44-
// column values
45-
left, right := b1.Column(i), b2.Column(i)
46-
for j := range left.Len() {
47-
// null check
48-
require.Equal(t, left.IsNull(j), right.IsNull(j), "null value mismatch")
49-
// isValid check
50-
require.Equal(t, left.IsValid(j), right.IsValid(j), "value mismatch")
51-
// value check
52-
require.Equal(t, left.ValueStr(j), right.ValueStr(j), "value mismatch")
79+
// compareRow compares a single row between two batches
80+
func compareRow(t *testing.T, batch1 arrow.Record, pos1 int64, batch2 arrow.Record, pos2 int64, rowNum int64) {
81+
// Assert that both batches have the same number of columns
82+
require.Equal(t, batch1.NumCols(), batch2.NumCols(), "row %d: column count differs", rowNum)
83+
84+
// Iterate over columns and compare values
85+
for j := 0; j < int(batch1.NumCols()); j++ {
86+
colName1 := batch1.ColumnName(j)
87+
col1 := batch1.Column(j)
88+
col2 := batch2.Column(j)
89+
90+
// Check if both columns are null
91+
isNull := col1.IsNull(int(pos1))
92+
require.Equal(t, isNull, col2.IsNull(int(pos2)), "row %d, column %s: null mismatch", rowNum, colName1)
93+
if isNull {
94+
continue
5395
}
96+
97+
// check validity
98+
isValid := col1.IsValid(int(pos1))
99+
require.Equal(t, isValid, col2.IsValid(int(pos2)), "row %d, column %s: validity mismatch", rowNum, colName1)
100+
if !isValid {
101+
continue
102+
}
103+
// Compare the values
104+
require.Equal(t, col1.ValueStr(int(pos1)), col2.ValueStr(int(pos2)), "row %d, column %s: value differs", rowNum, colName1)
105+
}
106+
}
107+
108+
// checkSchema validates that two records have the same schema
109+
func checkSchema(t *testing.T, a, b arrow.Record) {
110+
// Check that both records have the same number of columns
111+
require.Equal(t, a.NumCols(), b.NumCols(), "records have different number of columns")
112+
113+
// Iterate through columns and check names and data types
114+
for i := 0; i < int(a.NumCols()); i++ {
115+
require.Equal(t, a.ColumnName(i), b.ColumnName(i), "column name mismatch at index %d", i)
116+
require.Equal(t, a.Column(i).DataType(), b.Column(i).DataType(), "column data type mismatch at index %d", i)
54117
}
55118
}
56119

57120
func TestDataGenEquality(t *testing.T) {
58-
c := &Context{
59-
batchSize: 10,
121+
// Create two different contexts with different batch sizes
122+
c1 := &Context{
123+
batchSize: 10, // Small batch size
60124
}
125+
c2 := &Context{
126+
batchSize: 25, // Larger batch size
127+
}
128+
129+
// Use the same data generator for both pipelines
61130
gen := &dataGenerator{
62131
limit: 100,
63132
}
64133

65-
p1 := c.executeDataGenerator(context.Background(), gen)
66-
p2 := c.executeDataGenerator(context.Background(), gen)
134+
// Create pipelines with different batch sizes
135+
p1 := c1.executeDataGenerator(context.Background(), gen)
136+
p2 := c2.executeDataGenerator(context.Background(), gen)
67137

138+
// Compare the pipelines - should pass even with different batch sizes
68139
equalPipelines(t, p1, p2)
69140
}

0 commit comments

Comments
 (0)