diff --git a/csharp/src/StatementExecution/StatementExecutionClient.cs b/csharp/src/StatementExecution/StatementExecutionClient.cs index ddc52d34..d4796921 100644 --- a/csharp/src/StatementExecution/StatementExecutionClient.cs +++ b/csharp/src/StatementExecution/StatementExecutionClient.cs @@ -22,6 +22,7 @@ using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; +using Apache.Arrow.Adbc; namespace AdbcDrivers.Databricks.StatementExecution { @@ -403,7 +404,19 @@ private async Task EnsureSuccessStatusCodeAsync(HttpResponseMessage response) errorMessage = $"{errorMessage}. Response: {errorContent}"; } - throw new DatabricksException(errorMessage); + var statusCode = response.StatusCode switch + { + System.Net.HttpStatusCode.Unauthorized or System.Net.HttpStatusCode.Forbidden + => AdbcStatusCode.Unauthorized, + System.Net.HttpStatusCode.NotFound + => AdbcStatusCode.NotFound, + System.Net.HttpStatusCode.Conflict + => AdbcStatusCode.AlreadyExists, + System.Net.HttpStatusCode.BadRequest + => AdbcStatusCode.InvalidArgument, + _ => AdbcStatusCode.IOError, + }; + throw new DatabricksException(errorMessage, statusCode); } } } diff --git a/csharp/src/StatementExecution/StatementExecutionConnection.cs b/csharp/src/StatementExecution/StatementExecutionConnection.cs index 303ecb68..7c865b79 100644 --- a/csharp/src/StatementExecution/StatementExecutionConnection.cs +++ b/csharp/src/StatementExecution/StatementExecutionConnection.cs @@ -212,6 +212,9 @@ private StatementExecutionConnection( if (_enableMultipleCatalogSupport) { properties.TryGetValue(AdbcOptions.Connection.CurrentCatalog, out _catalog); + // Match Thrift behavior: SPARK is a legacy alias — map it to null so the + // runtime falls back to the workspace default (typically hive_metastore). + _catalog = DatabricksConnection.HandleSparkCatalog(_catalog); } properties.TryGetValue(AdbcOptions.Connection.CurrentDbSchema, out _schema); @@ -382,8 +385,22 @@ public async Task OpenAsync(CancellationToken cancellationToken = default) SessionConfigs = sessionConfigs.Count > 0 ? sessionConfigs : null }; - var response = await _client.CreateSessionAsync(request, cancellationToken).ConfigureAwait(false); - _sessionId = response.SessionId; + try + { + var response = await _client.CreateSessionAsync(request, cancellationToken).ConfigureAwait(false); + _sessionId = response.SessionId; + } + catch (DatabricksException) + { + throw; + } + catch (Exception ex) + { + throw new DatabricksException( + $"Failed to connect to Databricks: {ex.GetBaseException().Message}", + AdbcStatusCode.IOError, + ex); + } // If user didn't specify a catalog, discover the server's default. // In Thrift, the server returns this in OpenSessionResp.InitialNamespace. @@ -424,6 +441,18 @@ public override AdbcStatement CreateStatement() this); // Pass connection as TracingConnection for tracing support } + public override void SetOption(string key, string? value) + { + switch (key) + { + case AdbcOptions.Telemetry.TraceParent: + SetTraceParent(string.IsNullOrWhiteSpace(value) ? null : value); + return; + } + + base.SetOption(key, value); + } + public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? schemaPattern, string? tableNamePattern, IReadOnlyList? tableTypes, string? columnNamePattern) { return this.TraceActivity(activity => @@ -434,6 +463,13 @@ public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? cata activity?.SetTag("table_pattern", tableNamePattern ?? "(none)"); activity?.SetTag("column_pattern", columnNamePattern ?? "(none)"); + // Databricks identifiers are case-insensitive — lowercase patterns + // to match server behavior (same as DatabricksConnection/Thrift path). + catalogPattern = catalogPattern?.ToLower(); + schemaPattern = schemaPattern?.ToLower(); + tableNamePattern = tableNamePattern?.ToLower(); + columnNamePattern = columnNamePattern?.ToLower(); + using var cts = CreateMetadataTimeoutCts(); return GetObjectsResultBuilder.BuildGetObjectsResultAsync( this, depth, catalogPattern, schemaPattern, @@ -496,10 +532,12 @@ public override Schema GetTableSchema(string? catalog, string? dbSchema, string activity?.SetTag("table_name", tableName); using var cts = CreateMetadataTimeoutCts(); - string sql = new ShowColumnsCommand( - ResolveEffectiveCatalog(catalog), dbSchema, tableName).Build(); - activity?.SetTag("sql_query", sql); - var batches = ExecuteMetadataSql(sql, cts.Token); + // Pass catalog through with SPARK→null normalization, matching Thrift + // which sends catalog as-is to the server. ExecuteShowColumnsAsync + // handles null by iterating all catalogs. + string? resolvedCatalog = DatabricksConnection.HandleSparkCatalog(catalog); + var batches = ExecuteShowColumnsAsync(resolvedCatalog, dbSchema, tableName, null, cts.Token) + .GetAwaiter().GetResult(); var fields = new List(); foreach (var batch in batches) @@ -559,7 +597,7 @@ async Task> IGetObjectsDataProvider.GetCatalogsAsync(strin string sql = new ShowSchemasCommand(catalogPattern, schemaPattern).Build(); var batches = await ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); - // SHOW SCHEMAS IN ALL CATALOGS returns 2 columns: catalog, databaseName + // SHOW SCHEMAS IN ALL CATALOGS returns 2 columns: databaseName, catalog // SHOW SCHEMAS IN `catalog` returns 1 column: databaseName bool showSchemasInAllCatalogs = catalogPattern == null; @@ -571,8 +609,8 @@ async Task> IGetObjectsDataProvider.GetCatalogsAsync(strin if (showSchemasInAllCatalogs) { - catalogArray = batch.Column(0) as StringArray; - schemaArray = batch.Column(1) as StringArray; + schemaArray = batch.Column(0) as StringArray; + catalogArray = batch.Column(1) as StringArray; } else { @@ -634,8 +672,7 @@ async Task IGetObjectsDataProvider.PopulateColumnInfoAsync(string? catalogPatter Dictionary>> catalogMap, CancellationToken cancellationToken) { - string sql = new ShowColumnsCommand(catalogPattern, schemaPattern, tablePattern, columnPattern).Build(); - var batches = await ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); + var batches = await ExecuteShowColumnsAsync(catalogPattern, schemaPattern, tablePattern, columnPattern, cancellationToken).ConfigureAwait(false); var tablePositions = new Dictionary(); @@ -678,24 +715,6 @@ async Task IGetObjectsDataProvider.PopulateColumnInfoAsync(string? catalogPatter { ColumnMetadataHelper.PopulateTableInfoFromTypeName( tableInfo, colName, colType, position, nullable); - - // Match Thrift GetObjects behavior: SparkConnection.SetPrecisionScaleAndTypeName - // only sets Precision/Scale for DECIMAL, NUMERIC, CHAR, NCHAR, VARCHAR, - // NVARCHAR, LONGVARCHAR, LONGNVARCHAR. All other types get null. - int lastIdx = tableInfo.Precision.Count - 1; - short typeCode = tableInfo.ColType[lastIdx]; - if (typeCode != (short)HiveServer2Connection.ColumnTypeId.DECIMAL - && typeCode != (short)HiveServer2Connection.ColumnTypeId.NUMERIC - && typeCode != (short)HiveServer2Connection.ColumnTypeId.CHAR - && typeCode != (short)HiveServer2Connection.ColumnTypeId.NCHAR - && typeCode != (short)HiveServer2Connection.ColumnTypeId.VARCHAR - && typeCode != (short)HiveServer2Connection.ColumnTypeId.NVARCHAR - && typeCode != (short)HiveServer2Connection.ColumnTypeId.LONGVARCHAR - && typeCode != (short)HiveServer2Connection.ColumnTypeId.LONGNVARCHAR) - { - tableInfo.Precision[lastIdx] = null; - tableInfo.Scale[lastIdx] = null; - } } } } @@ -736,6 +755,49 @@ internal List ExecuteMetadataSql(string sql, CancellationToken canc return ExecuteMetadataSqlAsync(sql, cancellationToken).GetAwaiter().GetResult(); } + /// + /// Executes a SHOW COLUMNS command. When catalog is null, iterates over all catalogs + /// since SHOW COLUMNS IN ALL CATALOGS is not yet supported by the backend. + /// + internal async Task> ExecuteShowColumnsAsync( + string? catalog, string? schemaPattern, string? tablePattern, string? columnPattern, + CancellationToken cancellationToken) + { + if (catalog != null) + { + string sql = new ShowColumnsCommand(catalog, schemaPattern, tablePattern, columnPattern).Build(); + return await ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); + } + + // SHOW COLUMNS IN ALL CATALOGS is not supported — iterate over each catalog. + // TODO: Remove this fallback when the backend supports SHOW COLUMNS IN ALL CATALOGS. + var allBatches = new List(); + string catalogsSql = new ShowCatalogsCommand(null).Build(); + var catalogBatches = await ExecuteMetadataSqlAsync(catalogsSql, cancellationToken).ConfigureAwait(false); + + foreach (var batch in catalogBatches) + { + var catalogArray = batch.Column(0) as StringArray; + if (catalogArray == null) continue; + for (int i = 0; i < catalogArray.Length; i++) + { + if (catalogArray.IsNull(i)) continue; + string cat = catalogArray.GetString(i); + string sql = new ShowColumnsCommand(cat, schemaPattern, tablePattern, columnPattern).Build(); + try + { + var batches = await ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); + allBatches.AddRange(batches); + } + catch + { + // Skip catalogs we can't access (permission errors) + } + } + } + return allBatches; + } + internal bool EnablePKFK => _enablePKFK; internal bool EnableMultipleCatalogSupport => _enableMultipleCatalogSupport; @@ -748,30 +810,10 @@ internal List ExecuteMetadataSql(string sql, CancellationToken canc internal bool UseDescTableExtended => _useDescTableExtended; /// - /// Resolves the effective catalog for metadata queries. - /// SEA SHOW commands require an explicit catalog name in the SQL string - /// (e.g., SHOW SCHEMAS IN `catalog`), unlike Thrift which treats null - /// as "use session default." So we must always resolve to a concrete value. - /// When EnableMultipleCatalogSupport is true: uses the provided catalog, - /// falling back to the connection default catalog. - /// When EnableMultipleCatalogSupport is false: resolves via the session's - /// current catalog (SELECT CURRENT_CATALOG()) since _catalog is null - /// when the flag is false (matching Thrift behavior). + /// Returns the session's default catalog. Used by statements when + /// enableMultipleCatalogSupport=false and no catalog was specified. /// - internal string? ResolveEffectiveCatalog(string? requestedCatalog) - { - string? normalized = MetadataUtilities.NormalizeSparkCatalog(requestedCatalog); - - if (_enableMultipleCatalogSupport) - { - return normalized ?? _catalog; - } - - // flag=false: if user specified an explicit non-null catalog, it won't - // match the default — the statement layer should return empty. - // If null/SPARK, resolve via server query. - return normalized ?? GetCurrentCatalog(); - } + internal string? GetSessionDefaultCatalog() => GetCurrentCatalog(); /// /// Queries the server for the current catalog via SELECT CURRENT_CATALOG(). diff --git a/csharp/src/StatementExecution/StatementExecutionStatement.cs b/csharp/src/StatementExecution/StatementExecutionStatement.cs index a489e367..2c19d2f8 100644 --- a/csharp/src/StatementExecution/StatementExecutionStatement.cs +++ b/csharp/src/StatementExecution/StatementExecutionStatement.cs @@ -124,6 +124,11 @@ public StatementExecutionStatement( _lz4BufferPool = lz4BufferPool ?? throw new ArgumentNullException(nameof(lz4BufferPool)); _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); _enableComplexDatatypeSupport = connection.EnableComplexDatatypeSupport; + + // Match Thrift: statement starts with connection's default catalog. + // When enableMultipleCatalogSupport=true, this is the catalog from config (e.g. "main"). + // When false, _catalog is null (not set from config), matching Thrift behavior. + _metadataCatalogName = catalog; } /// @@ -189,6 +194,10 @@ public override void SetOption(string key, string value) case DatabricksParameters.MaxBytesPerFetchRequest: break; + case AdbcOptions.Telemetry.TraceParent: + SetTraceParent(string.IsNullOrEmpty(value) ? null : value); + break; + default: base.SetOption(key, value); break; @@ -295,8 +304,8 @@ public async Task ExecuteQueryAsync( // Get schema from reader var schema = reader.Schema; - // Return query result - use -1 if row count is not available - long rowCount = response.Manifest?.TotalRowCount ?? -1; + // Return query result - use 0 if row count is not available + long rowCount = response.Manifest?.TotalRowCount ?? 0; return new QueryResult(rowCount, reader); } @@ -616,7 +625,7 @@ public async Task ExecuteUpdateAsync(CancellationToken cancellatio throw new AdbcException("Statement was closed before results could be retrieved"); } - // For updates, we don't need to read the results - just return the row count + // For updates, we don't need to read the results - just return the row count. long rowCount = response.Manifest?.TotalRowCount ?? 0; return new UpdateResult(rowCount); } @@ -792,7 +801,35 @@ private static async Task FetchAllChunksAsync( // Metadata command routing - private string? EffectiveCatalog => _connection.ResolveEffectiveCatalog(_metadataCatalogName); + /// + /// Resolves the catalog for metadata SQL commands. + /// Matches Thrift behavior: + /// - SPARK → null (all catalogs) + /// - Other values pass through as-is + /// - null stays null + /// When enableMultipleCatalogSupport=false and result is null, + /// resolves to the session default catalog (SEA SQL requires an explicit + /// catalog for SHOW commands when not querying all catalogs). + /// + // TODO: Once the backend supports SHOW COLUMNS IN ALL CATALOGS, the + // ExecuteShowColumnsAsync iterate-all-catalogs fallback can be removed. + private string? EffectiveCatalog + { + get + { + // Normalize SPARK → null, same as Thrift's HandleSparkCatalog + string? catalog = DatabricksConnection.HandleSparkCatalog(_metadataCatalogName); + + if (_connection.EnableMultipleCatalogSupport) + { + // null means "all catalogs" (e.g. SHOW SCHEMAS IN ALL CATALOGS) + return catalog; + } + + // flag=false: null means use session default (SEA SQL needs explicit catalog) + return catalog ?? _connection.GetSessionDefaultCatalog(); + } + } /// /// Escapes wildcard characters (_ and %) in metadata name parameters when @@ -837,7 +874,9 @@ private async Task GetCatalogsAsync(CancellationToken cancellationT return new QueryResult(1, new HiveInfoArrowStream(catalogSchema, new IArrowArray[] { sparkBuilder.Build() })); } - string sql = new ShowCatalogsCommand(EscapePatternWildcardsInName(_metadataCatalogName)).Build(); + // GetCatalogs returns all catalogs — no filtering by pattern, + // matching Thrift behavior (Thrift RPC has no catalog filter for GetCatalogs). + string sql = new ShowCatalogsCommand(null).Build(); activity?.SetTag("sql_query", sql); var batches = await _connection.ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); @@ -883,7 +922,7 @@ private async Task GetSchemasAsync(CancellationToken cancellationTo activity?.SetTag("sql_query", sql); var batches = await _connection.ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); - // SHOW SCHEMAS IN ALL CATALOGS returns 2 columns: catalog_name, databaseName + // SHOW SCHEMAS IN ALL CATALOGS returns 2 columns: databaseName, catalog // SHOW SCHEMAS IN `catalog` returns 1 column: databaseName bool showAllCatalogs = catalog == null; @@ -897,8 +936,8 @@ private async Task GetSchemasAsync(CancellationToken cancellationTo if (showAllCatalogs) { - catalogArray = batch.Column(0) as StringArray; - schemaArray = batch.Column(1) as StringArray; + schemaArray = batch.Column(0) as StringArray; + catalogArray = batch.Column(1) as StringArray; } else { @@ -1024,13 +1063,12 @@ private async Task GetColumnsAsync(CancellationToken cancellationTo return FlatColumnsResultBuilder.BuildFlatColumnsResult( System.Array.Empty<(string, string, string, TableInfo)>()); - string sql = new ShowColumnsCommand( + var batches = await _connection.ExecuteShowColumnsAsync( catalog, EscapePatternWildcardsInName(_metadataSchemaName), EscapePatternWildcardsInName(_metadataTableName), - EscapePatternWildcardsInName(_metadataColumnName)).Build(); - activity?.SetTag("sql_query", sql); - var batches = await _connection.ExecuteMetadataSqlAsync(sql, cancellationToken).ConfigureAwait(false); + EscapePatternWildcardsInName(_metadataColumnName), + cancellationToken).ConfigureAwait(false); var tableInfos = new Dictionary();