Skip to content

feat(go): use load data statement for bulk ingestion#79

Open
shuprime wants to merge 1 commit intoadbc-drivers:mainfrom
shuprime:feat/load-data-ingestion
Open

feat(go): use load data statement for bulk ingestion#79
shuprime wants to merge 1 commit intoadbc-drivers:mainfrom
shuprime:feat/load-data-ingestion

Conversation

@shuprime
Copy link
Copy Markdown

@shuprime shuprime commented Mar 14, 2026

What's Changed

This PR optimizes bulk data ingestion in the MySQL driver by implementing support for LOAD DATA LOCAL INFILE.

Technical Details

  • Infile Capability Check: Added a check for @@local_infile at the start of ingestion to ensure server support.
  • Streaming Implementation: Leverages io.Pipe and gomysql.RegisterReaderHandler to stream data directly. This avoids writing temporary files to disk.
  • TSV Formatting: Implemented a row-based TSV encoder in csv_helper.go that handles basic escaping (backslashes, tabs, newlines) and MySQL-specific NULL representation (\N).
  • Fallback Logic: If the server does not support local data loading, the driver falls back to the original ExecuteBatchedBulkIngest logic.
  • Concurrency: Uses an atomic counter to generate unique registration names for the reader handler, intended to prevent collisions during simultaneous ingestions.

Testing

  • Added integration tests in mysql_ingest_test.go to verify data integrity for large batches and various Arrow types.
  • Included a test case to confirm successful fallback when local_infile is disabled.

Benchmark

  • Mysql server details
    • Instance Type: Cloud SQL for MySQL (Enterprise Plus edition)
    • CPU: 4 vCPUs
    • Memory: 32 GB
    • Storage: 250 GB SSD
  • VM Details on which benchmarking was done
    • Machine Type : c4-standard-2 (2 vCPUs, 7 GB Memory)
    • Architecture : x86/64
    • CPU Platform : Intel Emerald Rapids
  • Dataset used
image

Closes #78

@shuprime shuprime requested a review from lidavidm as a code owner March 14, 2026 07:16
Copy link
Copy Markdown
Contributor

@Mandukhai-Alimaa Mandukhai-Alimaa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work on this! Just a few things to address.

func (c *mysqlConnectionImpl) executeLoadDataIngest(ctx context.Context, conn *sqlwrapper.LoggingConn, options *driverbase.BulkIngestOptions, stream array.RecordReader) (int64, error) {
r, w := io.Pipe()
readerId := loadReaderCounter.Add(1)
readerName := fmt.Sprintf("adbc_ingest_%s_%d", options.TableName, readerId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a reader name injection/quoting risk. options.TableName is user-controlled and if the table name contains quotes or special characters, the query breaks. We can probably just use the counter without the user input (table name).

Comment on lines +312 to +315
res, err := conn.ExecContext(ctx, query)
if err != nil {
return -1, c.ErrorHelper.WrapIO(err, "failed to execute LOAD DATA statement")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ExecContext fail, the functions returns but the CSV writer goroutine keeps running and will block forever when the pipe buffer fills which could result in resource leaks. We can add r.Close() before returning to unblock the writer, and wrap the goroutine with a cancellable context.

colsList := strings.Join(colNames, ", ")

query := fmt.Sprintf(
"LOAD DATA LOCAL INFILE 'Reader::%s' INTO TABLE %s CHARACTER SET utf8mb4 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (%s)",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CHARACTER SET utf8mb4 makes MySQL treat the incoming stream as UTF‑8. If we ever ingest non‑UTF8 bytes (or binary-ish data), could this cause conversion issues? Do we need this here, or should we drop/make it optional?

rows, rowCount := it.CurrentBatch()

buf.Reset()
for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
for rowIdx := range rowCount {


buf.Reset()
for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
for colIdx := 0; colIdx < numCols; colIdx++ {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for colIdx := 0; colIdx < numCols; colIdx++ {
for colIdx := range numCols {

Comment on lines +216 to +220
schema := arrow.NewSchema([]arrow.Field{
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean},
{Name: "float", Type: arrow.PrimitiveTypes.Float64},
{Name: "bin", Type: arrow.BinaryTypes.Binary},
}, nil)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more types to this test to improve coverage? timestamp, date32, int8/16/64, uint32, decimal128 etc?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Optimize Bulk Ingestion with LOAD DATA LOCAL INFILE

2 participants