Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions go/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ const (

// GetCurrentCatalog implements driverbase.CurrentNamespacer.
func (c *mysqlConnectionImpl) GetCurrentCatalog() (string, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should call c.ClearPending() at the start of these connection-level SQL methods (GetCurrentCatalog, SetCurrentCatalog, PrepareDriverInfo, GetTableSchema, GetObjects), since they can run while another reader is still active on the same session and hit busy-connection / out-of-sync errors. That is already the pattern sqlwrapper uses for statement execution before reusing the dedicated connection. See adbc-drivers/driverbase-go#56

if err := c.ClearPending(); err != nil {
return "", err
}
var database string
err := c.Db.QueryRowContext(context.Background(), "SELECT DATABASE()").Scan(&database)
err := c.Conn.QueryRowContext(context.Background(), "SELECT DATABASE()").Scan(&database)
if err != nil {
return "", c.ErrorHelper.WrapIO(err, "failed to get current database")
}
Expand All @@ -55,7 +58,10 @@ func (c *mysqlConnectionImpl) GetCurrentDbSchema() (string, error) {

// SetCurrentCatalog implements driverbase.CurrentNamespacer.
func (c *mysqlConnectionImpl) SetCurrentCatalog(catalog string) error {
_, err := c.Db.ExecContext(context.Background(), "USE "+quoteIdentifier(catalog))
if err := c.ClearPending(); err != nil {
return err
}
_, err := c.Conn.ExecContext(context.Background(), "USE "+quoteIdentifier(catalog))
return err
}

Expand All @@ -68,6 +74,9 @@ func (c *mysqlConnectionImpl) SetCurrentDbSchema(schema string) error {
}

func (c *mysqlConnectionImpl) PrepareDriverInfo(ctx context.Context, infoCodes []adbc.InfoCode) error {
if err := c.ClearPending(); err != nil {
return err
}
if c.version == "" {
var version, comment string
if err := c.Conn.QueryRowContext(ctx, "SELECT @@version, @@version_comment").Scan(&version, &comment); err != nil {
Expand All @@ -80,6 +89,9 @@ func (c *mysqlConnectionImpl) PrepareDriverInfo(ctx context.Context, infoCodes [

// GetTableSchema returns the Arrow schema for a MySQL table
func (c *mysqlConnectionImpl) GetTableSchema(ctx context.Context, catalog *string, dbSchema *string, tableName string) (schema *arrow.Schema, err error) {
if err := c.ClearPending(); err != nil {
return nil, err
}
// Struct to capture MySQL column information
type tableColumn struct {
OrdinalPosition int32
Expand Down
Loading
Loading