Skip to content

Commit bf1abca

Browse files
fix: cap max ingestion batch size (adbc-drivers#71)
## What's Changed Calculates maxBatchSize taking into account max placeholder limit and number of columns and caps default or user-provided config. Closes adbc-drivers#70. --------- Co-authored-by: Mandukhai Alimaa <114253933+Mandukhai-Alimaa@users.noreply.github.com>
1 parent 1b2bb6a commit bf1abca

File tree

2 files changed

+66
-2
lines changed

2 files changed

+66
-2
lines changed

go/connection.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
const (
3232
// Default num of rows per batch for batched INSERT
3333
MySQLDefaultIngestBatchSize = 1000
34+
// MySQL's maximum number of placeholders in a prepared statement
35+
MySQLMaxPlaceholders = 65535
3436
)
3537

3638
// GetCurrentCatalog implements driverbase.CurrentNamespacer.
@@ -228,9 +230,14 @@ func (c *mysqlConnectionImpl) ExecuteBulkIngest(ctx context.Context, conn *sqlwr
228230
driverbase.OptionKeyIngestBatchSize)
229231
}
230232

231-
// Set MySQL-specific default batch size if user hasn't overridden
233+
// Set MySQL-specific default batch size if user hasn't overridden,
234+
// capping to stay within MySQL's 65,535 placeholder limit.
235+
numCols := len(schema.Fields())
236+
maxBatchSize := MySQLMaxPlaceholders / numCols
232237
if options.IngestBatchSize == 0 {
233-
options.IngestBatchSize = MySQLDefaultIngestBatchSize
238+
options.IngestBatchSize = min(MySQLDefaultIngestBatchSize, maxBatchSize)
239+
} else if options.IngestBatchSize > maxBatchSize {
240+
options.IngestBatchSize = maxBatchSize
234241
}
235242

236243
return sqlwrapper.ExecuteBatchedBulkIngest(

go/mysql_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,63 @@ func (s *MySQLTestSuite) TearDownSuite() {
669669
s.mem.AssertSize(s.T(), 0)
670670
}
671671

672+
func (s *MySQLTestSuite) TestBulkIngestManyColumns() {
673+
const numCols = 100
674+
const numRows = 5
675+
tableName := "bulk_ingest_wide"
676+
677+
// Drop the table if it exists
678+
s.NoError(s.stmt.SetSqlQuery("DROP TABLE IF EXISTS `" + tableName + "`"))
679+
_, err := s.stmt.ExecuteUpdate(s.ctx)
680+
s.Require().NoError(err)
681+
682+
// Build a schema with 100 int64 columns
683+
fields := make([]arrow.Field, numCols)
684+
for i := range numCols {
685+
fields[i] = arrow.Field{
686+
Name: fmt.Sprintf("col_%d", i), Type: arrow.PrimitiveTypes.Int64, Nullable: true,
687+
}
688+
}
689+
schema := arrow.NewSchema(fields, nil)
690+
691+
// Build a record batch with a few rows
692+
batchbldr := array.NewRecordBuilder(s.mem, schema)
693+
defer batchbldr.Release()
694+
for col := range numCols {
695+
bldr := batchbldr.Field(col).(*array.Int64Builder)
696+
for row := range numRows {
697+
bldr.Append(int64(col*numRows + row))
698+
}
699+
}
700+
batch := batchbldr.NewRecordBatch()
701+
defer batch.Release()
702+
703+
// Ingest — this would fail before the fix because
704+
// 1000 (default batch size) * 100 columns = 100,000 placeholders > 65,535 limit
705+
stmt, err := s.cnxn.NewStatement()
706+
s.Require().NoError(err)
707+
defer func() { s.NoError(stmt.Close()) }()
708+
709+
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, tableName))
710+
s.Require().NoError(stmt.Bind(s.ctx, batch))
711+
712+
affected, err := stmt.ExecuteUpdate(s.ctx)
713+
s.Require().NoError(err)
714+
if affected != -1 {
715+
s.EqualValues(numRows, affected)
716+
}
717+
718+
// Verify the data was ingested correctly
719+
s.Require().NoError(stmt.SetSqlQuery("SELECT COUNT(*) FROM `" + tableName + "`"))
720+
rdr, _, err := stmt.ExecuteQuery(s.ctx)
721+
s.Require().NoError(err)
722+
defer rdr.Release()
723+
724+
s.Require().True(rdr.Next())
725+
count := rdr.RecordBatch().Column(0).(*array.Int64).Value(0)
726+
s.EqualValues(numRows, count)
727+
}
728+
672729
func TestMySQLTypeTests(t *testing.T) {
673730
dsn := os.Getenv("MYSQL_DSN")
674731
if dsn == "" {

0 commit comments

Comments
 (0)