Skip to content

Commit 663706f

Browse files
authored
fix(arrow): drop-row at batch boundary in RowsToRecord (#685)
The loop condition `rows.Next() && count < batchSize` advanced the underlying cursor once after the final scan; that row was silently dropped when the caller asked for the next batch. Unbounded SELECTs crossing the 1024-row batch boundary lost one row per transition (deterministic, ORDER BY-stable). COUNT(*) returned the parquet metadata count, hiding the discrepancy from aggregation queries. Reported by Marce Coll on dbt_marce.credit_purchase_events (COUNT(*) = 12617, SELECT = 12605) and dbt.usage_allocation (2,689,942 vs 2,687,758). WHERE filters still found the rows. Swap the order: `count < batchSize && rows.Next()` so Next() is not called when the batch is already full. Add regression test covering the production case.
1 parent 03ee5f7 commit 663706f

2 files changed

Lines changed: 109 additions & 1 deletion

File tree

duckdbservice/arrow_helpers.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,15 @@ func RowsToRecord(alloc memory.Allocator, rows *sql.Rows, schema *arrow.Schema,
3838

3939
numFields := schema.NumFields()
4040
count := 0
41-
for rows.Next() && count < batchSize {
41+
// Order matters: check `count < batchSize` first, then call rows.Next().
42+
// The reverse (rows.Next() && count < batchSize) advances the cursor once
43+
// more after the final scan and that row is silently dropped — the next
44+
// call to RowsToRecord starts from the row *after* the one we skipped.
45+
// Production reads were losing one row at every batch boundary
46+
// (batchSize=1024) for unbounded SELECTs; COUNT(*) still returned the
47+
// parquet-metadata row count, so the discrepancy was invisible to
48+
// aggregation queries. See TestRowsToRecordNoRowsLostAtBatchBoundary.
49+
for count < batchSize && rows.Next() {
4250
values := make([]interface{}, numFields)
4351
valuePtrs := make([]interface{}, numFields)
4452
for i := range values {

duckdbservice/arrow_helpers_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,3 +1059,103 @@ func TestGetQuerySchemaTrailingSemicolon(t *testing.T) {
10591059
}
10601060
}
10611061

1062+
// TestRowsToRecordNoRowsLostAtBatchBoundary reproduces the production bug where
1063+
// RowsToRecord silently dropped one row at every batch transition for unbounded
1064+
// SELECTs. The driver-level cursor was being advanced by the loop condition
1065+
// `rows.Next() && count < batchSize` even when the batch was already full, so
1066+
// the row at index `batchSize`, `2*batchSize`, ... never reached a Scan call
1067+
// and was lost when the caller asked for the next batch.
1068+
//
1069+
// Reported by Marce Coll on dbt_marce.credit_purchase_events
1070+
// (12617 rows reported by COUNT(*), 12605 rows delivered by SELECT) and
1071+
// dbt.usage_allocation (2,689,942 vs 2,687,758). Symptoms:
1072+
// - SELECT col FROM big_table delivers fewer rows than COUNT(*).
1073+
// - WHERE col = X still returns the missing row.
1074+
// - Number of lost rows scales linearly with table size.
1075+
func TestRowsToRecordNoRowsLostAtBatchBoundary(t *testing.T) {
1076+
alloc := memory.NewGoAllocator()
1077+
db, err := sql.Open("duckdb", "")
1078+
if err != nil {
1079+
t.Fatalf("open: %v", err)
1080+
}
1081+
defer func() { _ = db.Close() }()
1082+
1083+
// Pick row counts that exercise multiple batch transitions and a partial
1084+
// final batch. batchSize = 1024 matches the value used by DoGetStatement.
1085+
const batchSize = 1024
1086+
cases := []struct {
1087+
name string
1088+
rows int
1089+
}{
1090+
{"single batch exact", 1024},
1091+
{"one row over boundary", 1025},
1092+
{"two batches exact", 2048},
1093+
{"three batches + partial", 3500},
1094+
{"production case credit_purchase_events", 12617},
1095+
}
1096+
1097+
for _, tc := range cases {
1098+
t.Run(tc.name, func(t *testing.T) {
1099+
tbl := fmt.Sprintf("t_%d", tc.rows)
1100+
if _, err := db.Exec(fmt.Sprintf(
1101+
"CREATE TABLE %s AS SELECT i AS id FROM range(0, %d) t(i)", tbl, tc.rows,
1102+
)); err != nil {
1103+
t.Fatalf("create table: %v", err)
1104+
}
1105+
1106+
ctx := context.Background()
1107+
rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT id FROM %s ORDER BY id", tbl))
1108+
if err != nil {
1109+
t.Fatalf("query: %v", err)
1110+
}
1111+
defer func() { _ = rows.Close() }()
1112+
1113+
schema, err := GetQuerySchema(ctx, db, fmt.Sprintf("SELECT id FROM %s", tbl), nil)
1114+
if err != nil {
1115+
t.Fatalf("schema: %v", err)
1116+
}
1117+
1118+
seen := make([]bool, tc.rows)
1119+
delivered := 0
1120+
for {
1121+
rec, err := RowsToRecord(alloc, rows, schema, batchSize)
1122+
if err != nil {
1123+
t.Fatalf("RowsToRecord: %v", err)
1124+
}
1125+
if rec == nil {
1126+
break
1127+
}
1128+
col := rec.Column(0).(*array.Int64)
1129+
for i := 0; i < col.Len(); i++ {
1130+
id := col.Value(i)
1131+
if id < 0 || id >= int64(tc.rows) {
1132+
rec.Release()
1133+
t.Fatalf("delivered out-of-range id %d (expected 0..%d)", id, tc.rows-1)
1134+
}
1135+
if seen[id] {
1136+
rec.Release()
1137+
t.Fatalf("id %d delivered twice", id)
1138+
}
1139+
seen[id] = true
1140+
delivered++
1141+
}
1142+
rec.Release()
1143+
}
1144+
1145+
if delivered != tc.rows {
1146+
// Walk the seen set to point at the first dropped id; this
1147+
// matches the production symptom (deterministic missing rows).
1148+
firstDropped := -1
1149+
for i, ok := range seen {
1150+
if !ok {
1151+
firstDropped = i
1152+
break
1153+
}
1154+
}
1155+
t.Fatalf("delivered %d rows, expected %d (first dropped id = %d)",
1156+
delivered, tc.rows, firstDropped)
1157+
}
1158+
})
1159+
}
1160+
}
1161+

0 commit comments

Comments
 (0)