-
Notifications
You must be signed in to change notification settings - Fork 12
fix(csharp): fix SEA protocol parity issues for metadata, errors, and catalog handling #338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
daf1f36
1d03e34
e81dc75
da40538
307a0a5
89d283e
c9847c0
f7694ed
81a779a
edcd170
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -210,6 +210,9 @@ private StatementExecutionConnection( | |
| if (_enableMultipleCatalogSupport) | ||
| { | ||
| properties.TryGetValue(AdbcOptions.Connection.CurrentCatalog, out _catalog); | ||
| // Match Thrift behavior: SPARK is a legacy alias — map it to null so the | ||
| // runtime falls back to the workspace default (typically hive_metastore). | ||
| _catalog = DatabricksConnection.HandleSparkCatalog(_catalog); | ||
| } | ||
| properties.TryGetValue(AdbcOptions.Connection.CurrentDbSchema, out _schema); | ||
|
|
||
|
|
@@ -380,8 +383,22 @@ public async Task OpenAsync(CancellationToken cancellationToken = default) | |
| SessionConfigs = sessionConfigs.Count > 0 ? sessionConfigs : null | ||
| }; | ||
|
|
||
| var response = await _client.CreateSessionAsync(request, cancellationToken).ConfigureAwait(false); | ||
| _sessionId = response.SessionId; | ||
| try | ||
| { | ||
| var response = await _client.CreateSessionAsync(request, cancellationToken).ConfigureAwait(false); | ||
| _sessionId = response.SessionId; | ||
| } | ||
| catch (DatabricksException) | ||
| { | ||
| throw; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| throw new DatabricksException( | ||
| $"Failed to connect to Databricks: {ex.GetBaseException().Message}", | ||
| AdbcStatusCode.IOError, | ||
| ex); | ||
| } | ||
|
|
||
| // If user didn't specify a catalog, discover the server's default. | ||
| // In Thrift, the server returns this in OpenSessionResp.InitialNamespace. | ||
|
|
@@ -422,6 +439,18 @@ public override AdbcStatement CreateStatement() | |
| this); // Pass connection as TracingConnection for tracing support | ||
| } | ||
|
|
||
| public override void SetOption(string key, string? value) | ||
| { | ||
| switch (key) | ||
| { | ||
| case AdbcOptions.Telemetry.TraceParent: | ||
| SetTraceParent(string.IsNullOrWhiteSpace(value) ? null : value); | ||
eric-wang-1990 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return; | ||
| } | ||
|
|
||
| base.SetOption(key, value); | ||
| } | ||
|
|
||
| public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? schemaPattern, string? tableNamePattern, IReadOnlyList<string>? tableTypes, string? columnNamePattern) | ||
| { | ||
| return this.TraceActivity(activity => | ||
|
|
@@ -432,6 +461,13 @@ public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? cata | |
| activity?.SetTag("table_pattern", tableNamePattern ?? "(none)"); | ||
| activity?.SetTag("column_pattern", columnNamePattern ?? "(none)"); | ||
|
|
||
| // Databricks identifiers are case-insensitive — lowercase patterns | ||
| // to match server behavior (same as DatabricksConnection/Thrift path). | ||
| catalogPattern = catalogPattern?.ToLower(); | ||
| schemaPattern = schemaPattern?.ToLower(); | ||
| tableNamePattern = tableNamePattern?.ToLower(); | ||
| columnNamePattern = columnNamePattern?.ToLower(); | ||
|
|
||
| using var cts = CreateMetadataTimeoutCts(); | ||
| return GetObjectsResultBuilder.BuildGetObjectsResultAsync( | ||
| this, depth, catalogPattern, schemaPattern, | ||
|
|
@@ -557,7 +593,7 @@ async Task<IReadOnlyList<string>> IGetObjectsDataProvider.GetCatalogsAsync(strin | |
| string sql = new ShowSchemasCommand(catalogPattern, schemaPattern).Build(); | ||
| var batches = await ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // SHOW SCHEMAS IN ALL CATALOGS returns 2 columns: catalog, databaseName | ||
| // SHOW SCHEMAS IN ALL CATALOGS returns 2 columns: databaseName, catalog | ||
| // SHOW SCHEMAS IN `catalog` returns 1 column: databaseName | ||
| bool showSchemasInAllCatalogs = catalogPattern == null; | ||
|
|
||
|
|
@@ -569,8 +605,8 @@ async Task<IReadOnlyList<string>> IGetObjectsDataProvider.GetCatalogsAsync(strin | |
|
|
||
| if (showSchemasInAllCatalogs) | ||
| { | ||
| catalogArray = batch.Column(0) as StringArray; | ||
| schemaArray = batch.Column(1) as StringArray; | ||
| schemaArray = batch.Column(0) as StringArray; | ||
| catalogArray = batch.Column(1) as StringArray; | ||
| } | ||
| else | ||
| { | ||
|
|
@@ -678,20 +714,25 @@ async Task IGetObjectsDataProvider.PopulateColumnInfoAsync(string? catalogPatter | |
| tableInfo, colName, colType, position, nullable); | ||
|
|
||
| // Match Thrift GetObjects behavior: SparkConnection.SetPrecisionScaleAndTypeName | ||
| // only sets Precision/Scale for DECIMAL, NUMERIC, CHAR, NCHAR, VARCHAR, | ||
| // NVARCHAR, LONGVARCHAR, LONGNVARCHAR. All other types get null. | ||
| // sets Precision for DECIMAL, NUMERIC, CHAR, NCHAR, VARCHAR, NVARCHAR, | ||
| // LONGVARCHAR, LONGNVARCHAR. Sets Scale only for DECIMAL/NUMERIC. | ||
| // All other types get null for both. | ||
| int lastIdx = tableInfo.Precision.Count - 1; | ||
| short typeCode = tableInfo.ColType[lastIdx]; | ||
| if (typeCode != (short)HiveServer2Connection.ColumnTypeId.DECIMAL | ||
| && typeCode != (short)HiveServer2Connection.ColumnTypeId.NUMERIC | ||
| && typeCode != (short)HiveServer2Connection.ColumnTypeId.CHAR | ||
| && typeCode != (short)HiveServer2Connection.ColumnTypeId.NCHAR | ||
| && typeCode != (short)HiveServer2Connection.ColumnTypeId.VARCHAR | ||
| && typeCode != (short)HiveServer2Connection.ColumnTypeId.NVARCHAR | ||
| && typeCode != (short)HiveServer2Connection.ColumnTypeId.LONGVARCHAR | ||
| && typeCode != (short)HiveServer2Connection.ColumnTypeId.LONGNVARCHAR) | ||
| bool isDecimalOrNumeric = typeCode == (short)HiveServer2Connection.ColumnTypeId.DECIMAL | ||
|
||
| || typeCode == (short)HiveServer2Connection.ColumnTypeId.NUMERIC; | ||
| bool isCharType = typeCode == (short)HiveServer2Connection.ColumnTypeId.CHAR | ||
| || typeCode == (short)HiveServer2Connection.ColumnTypeId.NCHAR | ||
| || typeCode == (short)HiveServer2Connection.ColumnTypeId.VARCHAR | ||
| || typeCode == (short)HiveServer2Connection.ColumnTypeId.NVARCHAR | ||
| || typeCode == (short)HiveServer2Connection.ColumnTypeId.LONGVARCHAR | ||
| || typeCode == (short)HiveServer2Connection.ColumnTypeId.LONGNVARCHAR; | ||
| if (!isDecimalOrNumeric && !isCharType) | ||
| { | ||
| tableInfo.Precision[lastIdx] = null; | ||
| } | ||
| if (!isDecimalOrNumeric) | ||
| { | ||
| tableInfo.Scale[lastIdx] = null; | ||
| } | ||
| } | ||
|
|
@@ -767,6 +808,12 @@ internal List<RecordBatch> ExecuteMetadataSql(string sql, CancellationToken canc | |
| /// <summary> | ||
| /// Queries the server for the current catalog via SELECT CURRENT_CATALOG(). | ||
| /// </summary> | ||
| /// <summary> | ||
| /// Returns the session's default catalog. Used by statements when | ||
| /// enableMultipleCatalogSupport=false and no catalog was specified. | ||
| /// </summary> | ||
| internal string? GetSessionDefaultCatalog() => GetCurrentCatalog(); | ||
eric-wang-1990 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private string? GetCurrentCatalog() | ||
| { | ||
| var batches = ExecuteMetadataSql("SELECT CURRENT_CATALOG()"); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -124,6 +124,11 @@ public StatementExecutionStatement( | |
| _lz4BufferPool = lz4BufferPool ?? throw new ArgumentNullException(nameof(lz4BufferPool)); | ||
| _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); | ||
| _enableComplexDatatypeSupport = connection.EnableComplexDatatypeSupport; | ||
|
|
||
| // Match Thrift: statement starts with connection's default catalog. | ||
| // When enableMultipleCatalogSupport=true, this is the catalog from config (e.g. "main"). | ||
| // When false, _catalog is null (not set from config), matching Thrift behavior. | ||
| _metadataCatalogName = catalog; | ||
| } | ||
|
|
||
| /// <summary> | ||
|
|
@@ -189,6 +194,10 @@ public override void SetOption(string key, string value) | |
| case DatabricksParameters.MaxBytesPerFetchRequest: | ||
| break; | ||
|
|
||
| case AdbcOptions.Telemetry.TraceParent: | ||
| SetTraceParent(string.IsNullOrEmpty(value) ? null : value); | ||
| break; | ||
|
|
||
| default: | ||
| base.SetOption(key, value); | ||
| break; | ||
|
|
@@ -615,8 +624,9 @@ public async Task<UpdateResult> ExecuteUpdateAsync(CancellationToken cancellatio | |
| throw new AdbcException("Statement was closed before results could be retrieved"); | ||
| } | ||
|
|
||
| // For updates, we don't need to read the results - just return the row count | ||
| long rowCount = response.Manifest?.TotalRowCount ?? 0; | ||
| // For updates, we don't need to read the results - just return the row count. | ||
| // Default to -1 (unknown) when no manifest/row count, matching Thrift behavior for DDL. | ||
| long rowCount = response.Manifest?.TotalRowCount ?? -1; | ||
| return new UpdateResult(rowCount); | ||
| } | ||
|
|
||
|
|
@@ -791,7 +801,33 @@ private static async Task<byte[]> FetchAllChunksAsync( | |
|
|
||
| // Metadata command routing | ||
|
|
||
| private string? EffectiveCatalog => _connection.ResolveEffectiveCatalog(_metadataCatalogName); | ||
| /// <summary> | ||
| /// Resolves the catalog for metadata SQL commands. | ||
| /// Matches Thrift behavior: | ||
| /// - SPARK → null (all catalogs) | ||
| /// - Other values pass through as-is | ||
| /// - null stays null | ||
| /// When enableMultipleCatalogSupport=false and result is null, | ||
| /// resolves to the session default catalog (SEA SQL requires an explicit | ||
| /// catalog for SHOW commands when not querying all catalogs). | ||
| /// </summary> | ||
| private string? EffectiveCatalog | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I remember you already have a similar function like this?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes — The reason we can't reuse The statement's
The connection's
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it just SHOW COLUMNS IN ALL CATALOGS not permitted?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We support SHOW TABLES IN ALL CATALOGS. For SHOW COLUMNS, the PR is merged in runtime but not yet released
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK so when that is released we can change the connection level ResolveEffectiveCatalog to use this one? Let's add a comment if so.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Added a TODO in f7694ed. Once the backend supports |
||
| { | ||
| get | ||
| { | ||
| // Normalize SPARK → null, same as Thrift's HandleSparkCatalog | ||
| string? catalog = DatabricksConnection.HandleSparkCatalog(_metadataCatalogName); | ||
|
|
||
| if (_connection.EnableMultipleCatalogSupport) | ||
| { | ||
| // null means "all catalogs" (e.g. SHOW SCHEMAS IN ALL CATALOGS) | ||
| return catalog; | ||
| } | ||
|
|
||
| // flag=false: null means use session default (SEA SQL needs explicit catalog) | ||
| return catalog ?? _connection.GetSessionDefaultCatalog(); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Escapes wildcard characters (_ and %) in metadata name parameters when | ||
|
|
@@ -836,7 +872,9 @@ private async Task<QueryResult> GetCatalogsAsync(CancellationToken cancellationT | |
| return new QueryResult(1, new HiveInfoArrowStream(catalogSchema, new IArrowArray[] { sparkBuilder.Build() })); | ||
| } | ||
|
|
||
| string sql = new ShowCatalogsCommand(EscapePatternWildcardsInName(_metadataCatalogName)).Build(); | ||
| // GetCatalogs returns all catalogs — no filtering by pattern, | ||
| // matching Thrift behavior (Thrift RPC has no catalog filter for GetCatalogs). | ||
| string sql = new ShowCatalogsCommand(null).Build(); | ||
| activity?.SetTag("sql_query", sql); | ||
| var batches = await _connection.ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); | ||
|
|
||
|
|
@@ -882,7 +920,7 @@ private async Task<QueryResult> GetSchemasAsync(CancellationToken cancellationTo | |
| activity?.SetTag("sql_query", sql); | ||
| var batches = await _connection.ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // SHOW SCHEMAS IN ALL CATALOGS returns 2 columns: catalog_name, databaseName | ||
| // SHOW SCHEMAS IN ALL CATALOGS returns 2 columns: databaseName, catalog | ||
| // SHOW SCHEMAS IN `catalog` returns 1 column: databaseName | ||
| bool showAllCatalogs = catalog == null; | ||
|
|
||
|
|
@@ -896,8 +934,8 @@ private async Task<QueryResult> GetSchemasAsync(CancellationToken cancellationTo | |
|
|
||
| if (showAllCatalogs) | ||
| { | ||
| catalogArray = batch.Column(0) as StringArray; | ||
| schemaArray = batch.Column(1) as StringArray; | ||
| schemaArray = batch.Column(0) as StringArray; | ||
| catalogArray = batch.Column(1) as StringArray; | ||
| } | ||
| else | ||
| { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.