Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/en/data_source/catalog/hive_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ To ensure successful SQL workloads on your Hive cluster, your StarRocks cluster
- Parquet and ORC files support the following compression formats: NO_COMPRESSION, SNAPPY, LZ4, ZSTD, and GZIP.
- Textfile files support the NO_COMPRESSION compression format.

You can use the session variable [`connector_sink_compression_codec`](../../sql-reference/System_variable.md#connector_sink_compression_codec) to specify the compression algorithm used for sinking data to Hive tables.
You can use the table property [`compression_codec`](../../data_source/catalog/hive_catalog.md#properties) or the system variable [`connector_sink_compression_codec`](../../sql-reference/System_variable.md#connector_sink_compression_codec) to specify the compression algorithm used for sinking data to Hive tables.

When writing to a Hive table, if the table's properties include a compression codec, StarRocks will preferentially use that algorithm to compress the written data. Otherwise, it will use the compression algorithm set in the system variable `connector_sink_compression_codec`.

## Integration preparations

Expand Down Expand Up @@ -1021,7 +1023,7 @@ The following table describes a few key properties.
| ----------------- | ------------------------------------------------------------ |
| location | The file path in which you want to create the managed table. When you use HMS as metastore, you do not need to specify the `location` parameter, because StarRocks will create the table in the default file path of the current Hive catalog. When you use AWS Glue as metadata service:<ul><li>If you have specified the `location` parameter for the database in which you want to create the table, you do not need to specify the `location` parameter for the table. As such, the table defaults to the file path of the database to which it belongs. </li><li>If you have not specified the `location` for the database in which you want to create the table, you must specify the `location` parameter for the table.</li></ul> |
| file_format | The file format of the managed table. Supported file formats are Parquet, ORC, and Textfile. ORC and Textfile formats are supported from v3.3 onwards. Valid values: `parquet`, `orc`, and `textfile`. Default value: `parquet`. |
| compression_codec | The compression algorithm used for the managed table. This property is deprecated in v3.2.3, since which version the compression algorithm used for sinking data to Hive tables is uniformly controlled by the session variable [connector_sink_compression_codec](../../sql-reference/System_variable.md#connector_sink_compression_codec). |
| compression_codec | The compression algorithm used for the managed table. |

### Examples

Expand Down Expand Up @@ -1068,7 +1070,7 @@ Note that sinking data to external tables is disabled by default. To sink data t
:::note

- You can grant and revoke privileges by using [GRANT](../../sql-reference/sql-statements/account-management/GRANT.md) and [REVOKE](../../sql-reference/sql-statements/account-management/REVOKE.md).
- You can use the session variable [connector_sink_compression_codec](../../sql-reference/System_variable.md#connector_sink_compression_codec) to specify the compression algorithm used for sinking data to Hive tables.
- You can use the table property [`compression_codec`](../../data_source/catalog/hive_catalog.md#properties) or the system variable [`connector_sink_compression_codec`](../../sql-reference/System_variable.md#connector_sink_compression_codec) to specify the compression algorithm used for sinking data to Hive tables. StarRocks will prioritize using the compression codec specified in the table property.

:::

Expand Down
2 changes: 1 addition & 1 deletion docs/en/data_source/catalog/iceberg/iceberg_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,7 @@ Description: The file format of the Iceberg table. Only the Parquet format is su

###### compression_codec

Description: The compression algorithm used for the Iceberg table. The supported compression algorithms are SNAPPY, GZIP, ZSTD, and LZ4. Default value: `gzip`. This property is deprecated in v3.2.3, since which version the compression algorithm used for sinking data to Iceberg tables is uniformly controlled by the session variable [connector_sink_compression_codec](../../../sql-reference/System_variable.md#connector_sink_compression_codec).
Description: The compression algorithm used for the Iceberg table. The supported compression algorithms are SNAPPY, GZIP, ZSTD, and LZ4. Default value: `zstd`.

---

Expand Down
5 changes: 4 additions & 1 deletion docs/en/sql-reference/System_variable.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ Used for MySQL client compatibility. No practical usage.

### connector_sink_compression_codec

* **Description**: Specifies the compression algorithm used for writing data into Hive tables or Iceberg tables, or exporting data with Files().
* **Description**: Specifies the compression algorithm used for writing data into Hive tables or Iceberg tables, or exporting data with Files(). This parameter only takes effect in the following situations:
* The `compression_codec` property is not exist in the Hive tables.
* The `compression_codec` and `write.parquet.compression-codec` property are not exist in the Iceberg tables.
* The `compression` property is not set when `INSERT INTO FILES`.
* **Valid values**: `uncompressed`, `snappy`, `lz4`, `zstd`, and `gzip`.
* **Default**: uncompressed
* **Data type**: String
Expand Down
7 changes: 4 additions & 3 deletions docs/zh/data_source/catalog/hive_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ Hive Catalog 是一种 External Catalog,自 2.3 版本开始支持。通过 Hi
- Parquet 和 ORC 文件支持 NO_COMPRESSION、SNAPPY、LZ4、ZSTD 和 GZIP 压缩格式。
- Textfile 文件支持 NO_COMPRESSION 压缩格式。

您可以通过系统变量 [`connector_sink_compression_codec`](../../sql-reference/System_variable.md#connector_sink_compression_codec) 来设置写入到 Hive 表时的压缩算法。
您可以通过table属性 [`compression_codec`](../../data_source/catalog/hive_catalog.md#properties)或者系统变量 [`connector_sink_compression_codec`](../../sql-reference/System_variable.md#connector_sink_compression_codec)来设置写入到 Hive 表时的压缩算法。
在写入hive table时,如果该表的属性中包含了compression codec,StarRocks优先使用该算法对写入数据进行压缩。否则,使用系统变量 connector_sink_compression_codec 中设置的压缩算法代替。

## 准备工作

Expand Down Expand Up @@ -1030,7 +1031,7 @@ PARTITION BY (par_col1[, par_col2...])
| ----------------- | ------------------------------------------------------------ |
| location | Managed Table 所在的文件路径。使用 HMS 作为元数据服务时,您无需指定 `location` 参数。使用 AWS Glue 作为元数据服务时:<ul><li>如果在创建当前数据库时指定了 `location` 参数,那么在当前数据库下建表时不需要再指定 `location` 参数,StarRocks 默认把表建在当前数据库所在的文件路径下。</li><li>如果在创建当前数据库时没有指定 `location` 参数,那么在当前数据库建表时必须指定 `location` 参数。</li></ul> |
| file_format | Managed Table 的文件格式。当前支持 Parquet、ORC、Textfile 文件格式,其中 ORC 和 Textfile 文件格式自 3.3 版本起支持。取值范围:`parquet`、`orc`、`textfile`。默认值:`parquet`。 |
| compression_codec | Managed Table 的压缩格式。该属性自 3.2.3 版本起弃用,此后写入 Hive 表时的压缩算法统一由会话变量 [connector_sink_compression_codec](../../sql-reference/System_variable.md#connector_sink_compression_codec) 控制。 |
| compression_codec | Managed Table 的压缩格式。 |

### 示例

Expand Down Expand Up @@ -1077,7 +1078,7 @@ PARTITION BY (par_col1[, par_col2...])
:::note

- 您可以通过 [GRANT](../../sql-reference/sql-statements/account-management/GRANT.md) 和 [REVOKE](../../sql-reference/sql-statements/account-management/REVOKE.md) 操作对用户和角色进行权限的赋予和收回。
- 您可以通过会话变量 [connector_sink_compression_codec](../../sql-reference/System_variable.md#connector_sink_compression_codec) 来指定写入 Hive 表时的压缩算法。
- 您可以通过table属性 [`compression_codec`](../../data_source/catalog/hive_catalog.md/#properties)或者系统变量 [`connector_sink_compression_codec`](../../sql-reference/System_variable.md#connector_sink_compression_codec)来设置写入到 Hive 表时的压缩算法。StarRocks会优先选择table属性中指定的compression codec来使用

:::

Expand Down
2 changes: 1 addition & 1 deletion docs/zh/data_source/catalog/iceberg/iceberg_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,7 @@ ORDER BY (column_name [sort_direction] [nulls_order], ...)

###### compression_codec

描述:Iceberg 表使用的压缩算法。支持的压缩算法有 SNAPPY、GZIP、ZSTD 和 LZ4。默认值:`gzip`。此属性在 v3.2.3 中已弃用,从该版本开始,导入数据到 Iceberg 表时使用的压缩算法由会话变量 [connector_sink_compression_codec](../../../sql-reference/System_variable.md#connector_sink_compression_codec) 统一控制
描述:Iceberg 表使用的压缩算法。支持的压缩算法有 SNAPPY、GZIP、ZSTD 和 LZ4。默认值:`zstd`

---

Expand Down
5 changes: 4 additions & 1 deletion docs/zh/sql-reference/System_variable.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ ALTER USER 'jack' SET PROPERTIES ('session.query_timeout' = '600');

### connector_sink_compression_codec

* 描述:用于指定写入 Hive 表或 Iceberg 表时以及使用 Files() 导出数据时的压缩算法。有效值:`uncompressed`、`snappy`、`lz4`、`zstd`、`gzip`。
* 描述:用于指定写入 Hive 表或 Iceberg 表时以及使用 Files() 导出数据时的压缩算法。有效值:`uncompressed`、`snappy`、`lz4`、`zstd`、`gzip`。该参数只在以下情况生效:
* Hive表中未指定compression codec属性。
* Iceberg的表中未包含`compression_codec`和`write.parquet.compression-codec`属性;
* `INSERT INTO FILES`时未设置 `compression`属性。
* 默认值:uncompressed
* 类型:String
* 引入版本:v3.2.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public HiveTableSink(HiveTable hiveTable, TupleDescriptor desc, boolean isStatic
this.textFileFormatDesc = Optional.of(toTextFileFormatDesc(hiveTable.getSerdeProperties()));
this.compressionType = String.valueOf(TCompressionType.NO_COMPRESSION);
} else {
this.compressionType = sessionVariable.getConnectorSinkCompressionCodec();
this.compressionType = hiveTable.getProperties().getOrDefault("compression_codec",
sessionVariable.getConnectorSinkCompressionCodec());
}

this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize() > 0 ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import static com.starrocks.sql.ast.OutFileClause.PARQUET_COMPRESSION_TYPE_MAP;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;

public class IcebergTableSink extends DataSink {
public final static int ICEBERG_SINK_MAX_DOP = 32;
Expand All @@ -60,7 +62,8 @@ public IcebergTableSink(IcebergTable icebergTable, TupleDescriptor desc, boolean
this.isStaticPartitionSink = isStaticPartitionSink;
this.fileFormat = nativeTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)
.toLowerCase();
this.compressionType = sessionVariable.getConnectorSinkCompressionCodec();
this.compressionType = nativeTable.properties().getOrDefault(PARQUET_COMPRESSION,
sessionVariable.getConnectorSinkCompressionCodec());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing compression validation for Iceberg table properties

The IcebergTableSink now reads compressionType from native table properties (line 65-66), but unlike HiveTableSink, there's no validation before using it in toThrift(). At line 124, PARQUET_COMPRESSION_TYPE_MAP.get(compressionType) can return null if the table property contains an unsupported compression codec. This null is then passed to setCompression_type(). In contrast, HiveTableSink validates using Preconditions.checkState() before calling .get(). If an Iceberg table was created outside StarRocks with an unusual compression codec, this could cause unexpected behavior in the backend.

Additional Locations (1)

Fix in Cursor Fix in Web

this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize() > 0 ?
sessionVariable.getConnectorSinkTargetMaxFileSize() : 1024L * 1024 * 1024;
this.targetBranch = targetBranch;
Expand Down
104 changes: 103 additions & 1 deletion fe/fe-core/src/test/java/com/starrocks/planner/HiveTableSinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static com.starrocks.server.CatalogMgr.ResourceMappingCatalog.toResourceName;
import static com.starrocks.sql.analyzer.AnalyzeTestUtil.getStarRocksAssert;
Expand Down Expand Up @@ -118,6 +119,107 @@ public void testHiveTableSink(@Mocked CatalogConnector hiveConnector) {
builder.setStorageFormat(HiveStorageFormat.AVRO);
ExceptionChecker.expectThrowsWithMsg(StarRocksConnectorException.class,
"Writing to hive table in [AVRO] format is not supported",
() ->new HiveTableSink(builder.build(), desc, true, new SessionVariable()));
() -> new HiveTableSink(builder.build(), desc, true, new SessionVariable()));
}

@Test
public void testCompressionFromHiveTableProperty(@Mocked CatalogConnector hiveConnector,
@Mocked SessionVariable sessionVariable) {
// hiveTable.properties contains compression_codec -> should use it
Map<String, String> props = new HashMap<>();
props.put("compression_codec", "snappy");

HiveTable.Builder builder = HiveTable.builder()
.setId(1)
.setTableName("hive_table")
.setCatalogName("hive_catalog")
.setHiveDbName("hive_db")
.setHiveTableName("hive_table")
.setPartitionColumnNames(java.util.Collections.singletonList("p1"))
.setDataColumnNames(java.util.Collections.singletonList("c1"))
.setFullSchema(java.util.Arrays.asList(new Column("c1", IntegerType.INT), new Column("p1", IntegerType.INT)))
.setTableLocation("hdfs://hadoop01:9000/tableLocation")
.setProperties(props)
.setStorageFormat(HiveStorageFormat.PARQUET)
.setCreateTime(System.currentTimeMillis());

new Expectations() {
{
CloudConfiguration cloudConfig = new CloudConfiguration();
cloudConfig.loadCommonFields(new HashMap<>());
hiveConnector.getMetadata().getCloudConfiguration();
result = cloudConfig;
minTimes = 1;
}
};

ConnectorMgr connectorMgr = AnalyzeTestUtil.getConnectContext().getGlobalStateMgr().getConnectorMgr();
new Expectations(connectorMgr) {{
connectorMgr.getConnector("hive_catalog");
result = hiveConnector;
minTimes = 1;

sessionVariable.getConnectorSinkCompressionCodec();
result = "gzip";
sessionVariable.getHiveTempStagingDir();
result = "/tmp";
}};

TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
HiveTableSink sink = new HiveTableSink(builder.build(), desc, true, sessionVariable);
TDataSink tDataSink = sink.toThrift();
THiveTableSink tHiveTableSink = tDataSink.getHive_table_sink();
Assertions.assertEquals(TCompressionType.SNAPPY, tHiveTableSink.getCompression_type());
}

@Test
public void testCompressionFallbackToSessionVariable(@Mocked CatalogConnector hiveConnector,
@Mocked SessionVariable sessionVariable) {
// hiveTable.properties does NOT contain compression_codec -> should fallback to sessionVariable
HiveTable.Builder builder = HiveTable.builder()
.setId(2)
.setTableName("hive_table2")
.setCatalogName("hive_catalog")
.setHiveDbName("hive_db")
.setHiveTableName("hive_table2")
.setPartitionColumnNames(java.util.Collections.singletonList("p1"))
.setDataColumnNames(java.util.Collections.singletonList("c1"))
.setFullSchema(java.util.Arrays.asList(new Column("c1", IntegerType.INT), new Column("p1", IntegerType.INT)))
.setTableLocation("hdfs://hadoop01:9000/tableLocation")
.setProperties(new HashMap<>())
.setStorageFormat(HiveStorageFormat.PARQUET)
.setCreateTime(System.currentTimeMillis());

new Expectations() {
{
CloudConfiguration cloudConfig = new CloudConfiguration();
cloudConfig.loadCommonFields(new HashMap<>());
hiveConnector.getMetadata().getCloudConfiguration();
result = cloudConfig;
minTimes = 1;

// session variable returns fallback compression codec and staging dir
sessionVariable.getConnectorSinkCompressionCodec();
result = "gzip";
sessionVariable.getHiveTempStagingDir();
result = "/tmp";
}
};

ConnectorMgr connectorMgr = AnalyzeTestUtil.getConnectContext().getGlobalStateMgr().getConnectorMgr();
new Expectations(connectorMgr) {
{
connectorMgr.getConnector("hive_catalog");
result = hiveConnector;
minTimes = 1;
}
};

TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
HiveTableSink sink = new HiveTableSink(builder.build(), desc, true, sessionVariable);
TDataSink tDataSink = sink.toThrift();
THiveTableSink tHiveTableSink = tDataSink.getHive_table_sink();
Assertions.assertEquals(TCompressionType.GZIP, tHiveTableSink.getCompression_type());
}

}
Loading
Loading