Skip to content

Conversation

@prmoore77
Copy link
Contributor

Addresses #1107

@github-actions github-actions bot modified the milestone: ADBC Libraries 22 Dec 15, 2025
@prmoore77 prmoore77 changed the title Implement support for Bulk Ingest for ADBC Flight SQL driver feat: Implement support for Bulk Ingest for ADBC Flight SQL driver Dec 15, 2025
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently use docker-compose to manage this and I'd rather keep it consistent...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @lidavidm - I've changed the code to use docker-compose, per your feedback.

@prmoore77 prmoore77 requested a review from lidavidm December 15, 2025 21:42
@prmoore77
Copy link
Contributor Author

FWIW - I tested the locally built driver/wheel from Python against a remote (Azure) GizmoSQL server - and it seems to work pretty well:

import os
import time

import duckdb
from adbc_driver_flightsql import dbapi as gizmosql
from codetiming import Timer
from dotenv import load_dotenv

from config import get_logger

# Timer logging setup
TIMER_TEXT = "{name}: Elapsed time: {:.4f} seconds"


def main():
    load_dotenv()

    logger = get_logger()
    timer_logger = logger.info
    with Timer(name=f"Overall program",
               text=TIMER_TEXT,
               initial_text=True,
               logger=timer_logger
               ):
        with Timer(name=f"  Generate TPCH data and load into DuckDB (1GB)",
                   text=TIMER_TEXT,
                   initial_text=True,
                   logger=timer_logger
                   ):
            # Connect to DuckDB (memory only)
            duckdb_conn = duckdb.connect()
            duckdb_conn.install_extension("tpch")
            duckdb_conn.load_extension("tpch")
            duckdb_conn.execute(query="CALL dbgen(sf=1.0)")

        with Timer(name=f"  Get RecordBatch reader for the DuckDB lineitem table",
                   text=TIMER_TEXT,
                   initial_text=True,
                   logger=timer_logger
                   ):
            lineitem_arrow_reader = duckdb_conn.table("lineitem").fetch_arrow_reader(batch_size=10_000)

        with Timer(name=f"  Bulk ingest the data into GizmoSQL",
                   text=TIMER_TEXT,
                   initial_text=True,
                   logger=timer_logger
                   ):
            with gizmosql.connect(
                    uri="grpc+tls://try-gizmosql-adbc.gizmodata.com:31337",
                    db_kwargs={"username": os.environ["GIZMOSQL_USERNAME"],
                               "password": os.environ["GIZMOSQL_PASSWORD"]
                               },
                    autocommit=True
            ).cursor() as cursor:
                ingest_start = time.perf_counter()
                rows_loaded = cursor.adbc_ingest(
                    table_name="bulk_ingest_lineitem",
                    data=lineitem_arrow_reader,
                    mode="replace"
                )
                ingest_seconds = time.perf_counter() - ingest_start

                rows_per_sec = (rows_loaded / ingest_seconds) if ingest_seconds > 0 else float("inf")
                logger.info(msg=f"Loaded rows: {rows_loaded:,}")
                logger.info(msg=f"Ingest time: {ingest_seconds:.4f} s")
                logger.info(msg=f"Rows/sec: {rows_per_sec:,.2f}")


if __name__ == "__main__":
    main()

Result:

2025-12-16 13:39:36,290 - INFO     Timer Overall program started
2025-12-16 13:39:36,290 - INFO     Timer   Generate TPCH data and load into DuckDB (1GB) started
2025-12-16 13:39:38,723 - INFO       Generate TPCH data and load into DuckDB (1GB): Elapsed time: 2.4328 seconds
2025-12-16 13:39:38,723 - INFO     Timer   Get RecordBatch reader for the DuckDB lineitem table started
2025-12-16 13:39:38,726 - INFO       Get RecordBatch reader for the DuckDB lineitem table: Elapsed time: 0.0029 seconds
2025-12-16 13:39:38,726 - INFO     Timer   Bulk ingest the data into GizmoSQL started
2025-12-16 13:40:11,055 - INFO     Loaded rows: 6,001,215
2025-12-16 13:40:11,056 - INFO     Ingest time: 33.3162 s
2025-12-16 13:40:11,056 - INFO     Rows/sec: 180,129.18
2025-12-16 13:40:11,058 - INFO       Bulk ingest the data into GizmoSQL: Elapsed time: 33.9063 seconds
2025-12-16 13:40:11,058 - INFO     Overall program: Elapsed time: 36.3427 seconds
image image

@prmoore77
Copy link
Contributor Author

Well, the integration tests are failing in the pipeline. They "worked on my laptop" - but I'll investigate to see what is happening.

@prmoore77
Copy link
Contributor Author

Well, the integration tests are failing in the pipeline. They "worked on my laptop" - but I'll investigate to see what is happening.

I believe the integration tests related to Bulk Ingestion are working now...

@prmoore77
Copy link
Contributor Author

hi @zeroshade - I think the integration tests are working well now. I do see some CI failures, but I think they are unrelated to my changes.

@lidavidm lidavidm changed the title feat: Implement support for Bulk Ingest for ADBC Flight SQL driver feat(go/adbc/driver/flightsql): support bulk ingest Dec 29, 2025
Comment on lines 92 to 101
// executeIngestWithReader performs the bulk ingest operation with the given record reader.
func executeIngestWithReader(
ctx context.Context,
client *flightsql.Client,
rdr array.RecordReader,
opts *flightsql.ExecuteIngestOpts,
callOpts ...grpc.CallOption,
) (int64, error) {
return client.ExecuteIngest(ctx, rdr, opts, callOpts...)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a function with one line that is only ever used once; just inline it?

rdr, err := array.NewRecordReader(batch.Schema(), []arrow.RecordBatch{batch})
if err != nil {
return nil, adbc.Error{
Msg: fmt.Sprintf("[Flight SQL Statement] failed to create record reader: %s", err.Error()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think there's no need to explicitly call err.Error()

Comment on lines 383 to 386
case adbc.OptionKeyIngestTargetTable:
s.query.sqlQuery = ""
s.query.substraitPlan = nil
s.targetTable = val
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clear s.prepared too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setSqlQuery needs to clear targetTable

Comment on lines 29 to 30
class TestBulkIngest:
"""Test bulk ingest functionality."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is there need for the class? It's not required in pytest

result = reader.read_all()
assert result.column("cnt")[0].as_py() == 5

def test_ingest_various_types(self, gizmosql):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this super necessary? It's more a test of gizmosql than the driver

Comment on lines 298 to 299
class TestBulkIngestDBAPI:
"""Test bulk ingest using the DBAPI interface."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here. The test names all encode the difference anyways, so why add the extra layer?

@prmoore77
Copy link
Contributor Author

hi @lidavidm - I've made edits per your feedback. Thanks for your help.

@prmoore77 prmoore77 requested a review from lidavidm December 29, 2025 17:50
}
}
case adbc.OptionKeyIngestTargetTable:
s.prepared = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use s.closePreparedStatement

Comment on lines +2733 to +2737
func (srv *BulkIngestTestServer) GetIngestedData() []arrow.RecordBatch {
srv.mu.Lock()
defer srv.mu.Unlock()
return srv.ingestedData
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock doesn't really do anything here unless you also copy the slice

Comment on lines +2739 to +2743
func (srv *BulkIngestTestServer) GetIngestRequests() []flightsql.StatementIngest {
srv.mu.Lock()
defer srv.mu.Unlock()
return srv.ingestRequests
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants