-
Notifications
You must be signed in to change notification settings - Fork 168
feat(go/adbc/driver/flightsql): support bulk ingest #3808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(go/adbc/driver/flightsql): support bulk ingest #3808
Conversation
…ht SQL driver. Addresses apache#1107
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently use docker-compose to manage this and I'd rather keep it consistent...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @lidavidm - I've changed the code to use docker-compose, per your feedback.
|
Well, the integration tests are failing in the pipeline. They "worked on my laptop" - but I'll investigate to see what is happening. |
I believe the integration tests related to Bulk Ingestion are working now... |
|
hi @zeroshade - I think the integration tests are working well now. I do see some CI failures, but I think they are unrelated to my changes. |
| // executeIngestWithReader performs the bulk ingest operation with the given record reader. | ||
| func executeIngestWithReader( | ||
| ctx context.Context, | ||
| client *flightsql.Client, | ||
| rdr array.RecordReader, | ||
| opts *flightsql.ExecuteIngestOpts, | ||
| callOpts ...grpc.CallOption, | ||
| ) (int64, error) { | ||
| return client.ExecuteIngest(ctx, rdr, opts, callOpts...) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a function with one line that is only ever used once; just inline it?
| rdr, err := array.NewRecordReader(batch.Schema(), []arrow.RecordBatch{batch}) | ||
| if err != nil { | ||
| return nil, adbc.Error{ | ||
| Msg: fmt.Sprintf("[Flight SQL Statement] failed to create record reader: %s", err.Error()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think there's no need to explicitly call err.Error()
| case adbc.OptionKeyIngestTargetTable: | ||
| s.query.sqlQuery = "" | ||
| s.query.substraitPlan = nil | ||
| s.targetTable = val |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to clear s.prepared too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setSqlQuery needs to clear targetTable
| class TestBulkIngest: | ||
| """Test bulk ingest functionality.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is there need for the class? It's not required in pytest
| result = reader.read_all() | ||
| assert result.column("cnt")[0].as_py() == 5 | ||
|
|
||
| def test_ingest_various_types(self, gizmosql): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this super necessary? It's more a test of gizmosql than the driver
| class TestBulkIngestDBAPI: | ||
| """Test bulk ingest using the DBAPI interface.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here. The test names all encode the difference anyways, so why add the extra layer?
|
hi @lidavidm - I've made edits per your feedback. Thanks for your help. |
| } | ||
| } | ||
| case adbc.OptionKeyIngestTargetTable: | ||
| s.prepared = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use s.closePreparedStatement
| func (srv *BulkIngestTestServer) GetIngestedData() []arrow.RecordBatch { | ||
| srv.mu.Lock() | ||
| defer srv.mu.Unlock() | ||
| return srv.ingestedData | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lock doesn't really do anything here unless you also copy the slice
| func (srv *BulkIngestTestServer) GetIngestRequests() []flightsql.StatementIngest { | ||
| srv.mu.Lock() | ||
| defer srv.mu.Unlock() | ||
| return srv.ingestRequests | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto here


Addresses #1107