Skip to content

Commit 438b9e5

Browse files
committed
[Enhancement] Prioritize using the "compression_codec" table property in connector sink modules.
Signed-off-by: Gavin <[email protected]>
1 parent 8eef323 commit 438b9e5

File tree

4 files changed

+280
-3
lines changed

4 files changed

+280
-3
lines changed

fe/fe-core/src/main/java/com/starrocks/planner/HiveTableSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public HiveTableSink(HiveTable hiveTable, TupleDescriptor desc, boolean isStatic
6868
this.textFileFormatDesc = Optional.of(toTextFileFormatDesc(hiveTable.getSerdeProperties()));
6969
this.compressionType = String.valueOf(TCompressionType.NO_COMPRESSION);
7070
} else {
71-
this.compressionType = sessionVariable.getConnectorSinkCompressionCodec();
71+
this.compressionType = hiveTable.getProperties().getOrDefault("compression_codec",
72+
sessionVariable.getConnectorSinkCompressionCodec());
7273
}
7374

7475
this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize() > 0 ?

fe/fe-core/src/main/java/com/starrocks/planner/IcebergTableSink.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import static com.starrocks.sql.ast.OutFileClause.PARQUET_COMPRESSION_TYPE_MAP;
3535
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
3636
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
37+
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
38+
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
3739

3840
public class IcebergTableSink extends DataSink {
3941
public final static int ICEBERG_SINK_MAX_DOP = 32;
@@ -60,7 +62,8 @@ public IcebergTableSink(IcebergTable icebergTable, TupleDescriptor desc, boolean
6062
this.isStaticPartitionSink = isStaticPartitionSink;
6163
this.fileFormat = nativeTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)
6264
.toLowerCase();
63-
this.compressionType = sessionVariable.getConnectorSinkCompressionCodec();
65+
this.compressionType = nativeTable.properties().getOrDefault(PARQUET_COMPRESSION,
66+
sessionVariable.getConnectorSinkCompressionCodec());
6467
this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize() > 0 ?
6568
sessionVariable.getConnectorSinkTargetMaxFileSize() : 1024L * 1024 * 1024;
6669
this.targetBranch = targetBranch;

fe/fe-core/src/test/java/com/starrocks/planner/HiveTableSinkTest.java

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.jupiter.api.Test;
4343

4444
import java.util.HashMap;
45+
import java.util.Map;
4546

4647
import static com.starrocks.server.CatalogMgr.ResourceMappingCatalog.toResourceName;
4748
import static com.starrocks.sql.analyzer.AnalyzeTestUtil.getStarRocksAssert;
@@ -118,6 +119,107 @@ public void testHiveTableSink(@Mocked CatalogConnector hiveConnector) {
118119
builder.setStorageFormat(HiveStorageFormat.AVRO);
119120
ExceptionChecker.expectThrowsWithMsg(StarRocksConnectorException.class,
120121
"Writing to hive table in [AVRO] format is not supported",
121-
() ->new HiveTableSink(builder.build(), desc, true, new SessionVariable()));
122+
() -> new HiveTableSink(builder.build(), desc, true, new SessionVariable()));
122123
}
124+
125+
@Test
126+
public void testCompressionFromHiveTableProperty(@Mocked CatalogConnector hiveConnector,
127+
@Mocked SessionVariable sessionVariable) {
128+
// hiveTable.properties contains compression_codec -> should use it
129+
Map<String, String> props = new HashMap<>();
130+
props.put("compression_codec", "snappy");
131+
132+
HiveTable.Builder builder = HiveTable.builder()
133+
.setId(1)
134+
.setTableName("hive_table")
135+
.setCatalogName("hive_catalog")
136+
.setHiveDbName("hive_db")
137+
.setHiveTableName("hive_table")
138+
.setPartitionColumnNames(java.util.Collections.singletonList("p1"))
139+
.setDataColumnNames(java.util.Collections.singletonList("c1"))
140+
.setFullSchema(java.util.Arrays.asList(new Column("c1", IntegerType.INT), new Column("p1", IntegerType.INT)))
141+
.setTableLocation("hdfs://hadoop01:9000/tableLocation")
142+
.setProperties(props)
143+
.setStorageFormat(HiveStorageFormat.PARQUET)
144+
.setCreateTime(System.currentTimeMillis());
145+
146+
new Expectations() {
147+
{
148+
CloudConfiguration cloudConfig = new CloudConfiguration();
149+
cloudConfig.loadCommonFields(new HashMap<>());
150+
hiveConnector.getMetadata().getCloudConfiguration();
151+
result = cloudConfig;
152+
minTimes = 1;
153+
}
154+
};
155+
156+
ConnectorMgr connectorMgr = AnalyzeTestUtil.getConnectContext().getGlobalStateMgr().getConnectorMgr();
157+
new Expectations(connectorMgr) {{
158+
connectorMgr.getConnector("hive_catalog");
159+
result = hiveConnector;
160+
minTimes = 1;
161+
162+
sessionVariable.getConnectorSinkCompressionCodec();
163+
result = "gzip";
164+
sessionVariable.getHiveTempStagingDir();
165+
result = "/tmp";
166+
}};
167+
168+
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
169+
HiveTableSink sink = new HiveTableSink(builder.build(), desc, true, sessionVariable);
170+
TDataSink tDataSink = sink.toThrift();
171+
THiveTableSink tHiveTableSink = tDataSink.getHive_table_sink();
172+
Assertions.assertEquals(TCompressionType.SNAPPY, tHiveTableSink.getCompression_type());
173+
}
174+
175+
@Test
176+
public void testCompressionFallbackToSessionVariable(@Mocked CatalogConnector hiveConnector,
177+
@Mocked SessionVariable sessionVariable) {
178+
// hiveTable.properties does NOT contain compression_codec -> should fallback to sessionVariable
179+
HiveTable.Builder builder = HiveTable.builder()
180+
.setId(2)
181+
.setTableName("hive_table2")
182+
.setCatalogName("hive_catalog")
183+
.setHiveDbName("hive_db")
184+
.setHiveTableName("hive_table2")
185+
.setPartitionColumnNames(java.util.Collections.singletonList("p1"))
186+
.setDataColumnNames(java.util.Collections.singletonList("c1"))
187+
.setFullSchema(java.util.Arrays.asList(new Column("c1", IntegerType.INT), new Column("p1", IntegerType.INT)))
188+
.setTableLocation("hdfs://hadoop01:9000/tableLocation")
189+
.setProperties(new HashMap<>())
190+
.setStorageFormat(HiveStorageFormat.PARQUET)
191+
.setCreateTime(System.currentTimeMillis());
192+
193+
new Expectations() {
194+
{
195+
CloudConfiguration cloudConfig = new CloudConfiguration();
196+
cloudConfig.loadCommonFields(new HashMap<>());
197+
hiveConnector.getMetadata().getCloudConfiguration();
198+
result = cloudConfig;
199+
minTimes = 1;
200+
201+
// session variable returns fallback compression codec and staging dir
202+
sessionVariable.getConnectorSinkCompressionCodec();
203+
result = "gzip";
204+
sessionVariable.getHiveTempStagingDir();
205+
result = "/tmp";
206+
}
207+
};
208+
209+
ConnectorMgr connectorMgr = AnalyzeTestUtil.getConnectContext().getGlobalStateMgr().getConnectorMgr();
210+
new Expectations(connectorMgr) {
211+
{
212+
connectorMgr.getConnector("hive_catalog");
213+
result = hiveConnector;
214+
minTimes = 1;
215+
}
216+
};
217+
218+
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
219+
HiveTableSink sink = new HiveTableSink(builder.build(), desc, true, sessionVariable);
220+
TDataSink tDataSink = sink.toThrift();
221+
THiveTableSink tHiveTableSink = tDataSink.getHive_table_sink();
222+
Assertions.assertEquals(TCompressionType.GZIP, tHiveTableSink.getCompression_type());
223+
}
224+
123225
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright 2021-present StarRocks, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// language: java
16+
package com.starrocks.planner;
17+
18+
import com.google.common.collect.Maps;
19+
import com.starrocks.catalog.IcebergTable;
20+
import com.starrocks.connector.CatalogConnector;
21+
import com.starrocks.connector.ConnectorMgr;
22+
import com.starrocks.credential.CloudConfiguration;
23+
import com.starrocks.credential.CloudConfigurationFactory;
24+
import com.starrocks.qe.SessionVariable;
25+
import com.starrocks.server.GlobalStateMgr;
26+
import com.starrocks.thrift.TCompressionType;
27+
import com.starrocks.thrift.TDataSink;
28+
import mockit.Expectations;
29+
import mockit.Mocked;
30+
import org.apache.iceberg.Table;
31+
import org.apache.iceberg.io.FileIO;
32+
import org.junit.jupiter.api.Assertions;
33+
import org.junit.jupiter.api.Test;
34+
35+
import java.util.HashMap;
36+
import java.util.Map;
37+
38+
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
39+
40+
public class IcebergTableSinkTest {
41+
42+
@Test
43+
public void testCompressionFromNativeTableProperty(@Mocked GlobalStateMgr globalStateMgr, @Mocked ConnectorMgr connectorMgr,
44+
@Mocked CatalogConnector connector, @Mocked IcebergTable icebergTable,
45+
@Mocked Table nativeTable, @Mocked FileIO fileIO,
46+
@Mocked SessionVariable sessionVariable) {
47+
// nativeTable.properties contains parquet compression -> should use it
48+
Map<String, String> nativeProps = Maps.newHashMap();
49+
nativeProps.put(PARQUET_COMPRESSION, "zstd");
50+
51+
CloudConfiguration cc = CloudConfigurationFactory.buildCloudConfigurationForStorage(new HashMap<>());
52+
53+
new Expectations() {
54+
{
55+
// iceberg table / native table basic
56+
icebergTable.getNativeTable();
57+
result = nativeTable;
58+
59+
icebergTable.getCatalogName();
60+
result = "catA";
61+
62+
icebergTable.getId();
63+
result = 101L;
64+
65+
icebergTable.getUUID();
66+
result = "uuidA";
67+
68+
nativeTable.location();
69+
result = "s3://bucket/a";
70+
71+
nativeTable.properties();
72+
result = nativeProps;
73+
74+
nativeTable.io();
75+
result = fileIO;
76+
77+
fileIO.properties();
78+
result = new HashMap<String, String>();
79+
80+
// Global state and connector metadata fallback path
81+
GlobalStateMgr.getCurrentState();
82+
result = globalStateMgr;
83+
84+
globalStateMgr.getConnectorMgr();
85+
result = connectorMgr;
86+
87+
connectorMgr.getConnector("catA");
88+
result = connector;
89+
90+
connector.getMetadata().getCloudConfiguration();
91+
result = cc;
92+
93+
// session variable should not be used for compression in this case, but stub anyway
94+
sessionVariable.getConnectorSinkCompressionCodec();
95+
result = "gzip";
96+
97+
sessionVariable.getConnectorSinkTargetMaxFileSize();
98+
result = 0L;
99+
}
100+
};
101+
102+
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
103+
IcebergTableSink sink = new IcebergTableSink(icebergTable, desc, false, sessionVariable, null);
104+
TDataSink t = sink.toThrift();
105+
// compression_type should map "zstd" -> TCompressionType.ZSTD
106+
Assertions.assertEquals(TCompressionType.ZSTD, t.getIceberg_table_sink().getCompression_type());
107+
}
108+
109+
@Test
110+
public void testCompressionFromSessionVariableFallback(@Mocked GlobalStateMgr globalStateMgr,
111+
@Mocked ConnectorMgr connectorMgr, @Mocked CatalogConnector connector,
112+
@Mocked IcebergTable icebergTable, @Mocked Table nativeTable,
113+
@Mocked FileIO fileIO, @Mocked SessionVariable sessionVariable) {
114+
// nativeTable.properties does NOT contain parquet compression -> should fallback to sessionVariable
115+
Map<String, String> nativeProps = Maps.newHashMap(); // empty
116+
117+
CloudConfiguration cc = CloudConfigurationFactory.buildCloudConfigurationForStorage(new HashMap<>());
118+
119+
new Expectations() {
120+
{
121+
icebergTable.getNativeTable();
122+
result = nativeTable;
123+
124+
icebergTable.getCatalogName();
125+
result = "catB";
126+
127+
icebergTable.getId();
128+
result = 102L;
129+
130+
icebergTable.getUUID();
131+
result = "uuidB";
132+
133+
nativeTable.location();
134+
result = "s3://bucket/b";
135+
136+
nativeTable.properties();
137+
result = nativeProps;
138+
139+
nativeTable.io();
140+
result = fileIO;
141+
142+
fileIO.properties();
143+
result = new HashMap<String, String>();
144+
145+
GlobalStateMgr.getCurrentState();
146+
result = globalStateMgr;
147+
148+
globalStateMgr.getConnectorMgr();
149+
result = connectorMgr;
150+
151+
connectorMgr.getConnector("catB");
152+
result = connector;
153+
154+
connector.getMetadata().getCloudConfiguration();
155+
result = cc;
156+
157+
// session variable provides fallback compression codec
158+
sessionVariable.getConnectorSinkCompressionCodec();
159+
result = "gzip";
160+
sessionVariable.getConnectorSinkTargetMaxFileSize();
161+
result = 0L;
162+
}
163+
};
164+
165+
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
166+
IcebergTableSink sink = new IcebergTableSink(icebergTable, desc, false, sessionVariable, null);
167+
TDataSink t = sink.toThrift();
168+
// fallback "gzip" -> TCompressionType.GZIP
169+
Assertions.assertEquals(TCompressionType.GZIP, t.getIceberg_table_sink().getCompression_type());
170+
}
171+
}

0 commit comments

Comments
 (0)