Skip to content

Commit 4bbecc3

Browse files
committed
HIVE-29354: Shredded Variant PPD
1 parent 3e71f4d commit 4bbecc3

File tree

13 files changed

+749
-21
lines changed

13 files changed

+749
-21
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private InputFormatConfig() {
3939
public static final String SKIP_RESIDUAL_FILTERING = "skip.residual.filtering";
4040
public static final String AS_OF_TIMESTAMP = "iceberg.mr.as.of.time";
4141
public static final String FILTER_EXPRESSION = "iceberg.mr.filter.expression";
42+
public static final String VARIANT_FILTER_EXPRESSION = "iceberg.mr.variant.filter.expression";
4243
public static final String GROUPING_PARTITION_COLUMNS = "iceberg.mr.grouping.partition.columns";
4344
public static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model";
4445
public static final String READ_SCHEMA = "iceberg.mr.read.schema";
@@ -214,4 +215,16 @@ private static Schema schema(Configuration conf, String key) {
214215
return json == null ? null : SchemaParser.fromJson(json);
215216
}
216217

218+
public static Expression variantFilter(Configuration conf) {
219+
String serialized = conf.get(VARIANT_FILTER_EXPRESSION);
220+
return serialized == null ? null : SerializationUtil.deserializeFromBase64(serialized);
221+
}
222+
223+
public static void setVariantFilter(Configuration conf, Expression expression) {
224+
if (expression == null) {
225+
conf.unset(VARIANT_FILTER_EXPRESSION);
226+
} else {
227+
conf.set(VARIANT_FILTER_EXPRESSION, SerializationUtil.serializeToBase64(expression));
228+
}
229+
}
217230
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
9090
}
9191
}
9292

93+
private static final String VARIANT_SHREDDING_ENABLED = "variant.shredding.enabled";
94+
9395
/**
9496
* Converts the Hive filter found in the job conf to an Iceberg filter expression.
9597
* @param conf - job conf
@@ -117,15 +119,21 @@ static Expression icebergDataFilterFromHiveConf(Configuration conf) {
117119
* @return Iceberg Filter Expression
118120
*/
119121
static Expression getFilterExpr(Configuration conf, ExprNodeGenericFuncDesc exprNodeDesc) {
120-
if (exprNodeDesc != null) {
121-
SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc);
122-
try {
123-
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
124-
} catch (UnsupportedOperationException e) {
125-
LOG.warn("Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): ", e);
126-
}
122+
if (exprNodeDesc == null) {
123+
InputFormatConfig.setVariantFilter(conf, null);
124+
return null;
127125
}
128-
return null;
126+
127+
Expression icebergFilter = buildFilterExpression(conf, exprNodeDesc, "Iceberg");
128+
129+
if (Boolean.parseBoolean(conf.get(VARIANT_SHREDDING_ENABLED))) {
130+
Expression variantFilter = buildVariantFilter(conf, exprNodeDesc);
131+
InputFormatConfig.setVariantFilter(conf, variantFilter);
132+
} else {
133+
InputFormatConfig.setVariantFilter(conf, null);
134+
}
135+
136+
return icebergFilter;
129137
}
130138

131139
/**
@@ -146,6 +154,28 @@ public static Expression residualForTask(FileScanTask task, Configuration conf)
146154
).residualFor(task.file().partition());
147155
}
148156

157+
private static Expression buildFilterExpression(
158+
Configuration conf, ExprNodeGenericFuncDesc expression, String filterLabel) {
159+
try {
160+
SearchArgument sarg = ConvertAstToSearchArg.create(conf, expression);
161+
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
162+
} catch (UnsupportedOperationException e) {
163+
LOG.warn("Unable to create {} filter, proceeding without it: ", filterLabel, e);
164+
return null;
165+
}
166+
}
167+
168+
private static Expression buildVariantFilter(
169+
Configuration conf, ExprNodeGenericFuncDesc exprNodeDesc) {
170+
ExprNodeGenericFuncDesc rewrittenExpr = VariantFilterRewriter.rewriteForShredding(exprNodeDesc, true);
171+
String exprString = rewrittenExpr == null ? null : rewrittenExpr.getExprString();
172+
if (exprString == null || !exprString.contains(".typed_value.")) {
173+
return null;
174+
}
175+
176+
return buildFilterExpression(conf, rewrittenExpr, "variant");
177+
}
178+
149179
@Override
150180
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
151181
Expression filter = icebergDataFilterFromHiveConf(job);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,8 @@ public HiveAuthorizationProvider getAuthorizationProvider() {
281281
return null;
282282
}
283283

284+
private static final String VARIANT_SHREDDING_ENABLED = "variant.shredding.enabled";
285+
284286
@Override
285287
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
286288
overlayTableProperties(conf, tableDesc, map);
@@ -293,6 +295,7 @@ public void configureInputJobProperties(TableDesc tableDesc, Map<String, String>
293295

294296
map.put(ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.varname,
295297
String.valueOf(allowDataFilesWithinTableLocationOnly));
298+
propagateVariantShreddingProperty(tableDesc, map);
296299
}
297300

298301
@Override
@@ -311,7 +314,7 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
311314
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
312315
SessionStateUtil.getResource(conf, SessionStateUtil.MISSING_COLUMNS)
313316
.ifPresent(cols -> map.put(SessionStateUtil.MISSING_COLUMNS, String.join(",", (HashSet<String>) cols)));
314-
317+
propagateVariantShreddingProperty(tableDesc, map);
315318
}
316319

317320
/**
@@ -356,6 +359,8 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
356359
jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName);
357360
}
358361
}
362+
propagateVariantShreddingProperty(tableDesc, jobConf);
363+
359364
try {
360365
if (!jobConf.getBoolean(ConfVars.HIVE_IN_TEST_IDE.varname, false)) {
361366
// For running unit test this won't work as maven surefire CP is different than what we have on a cluster:
@@ -1523,6 +1528,26 @@ private static void setCommonJobConf(JobConf jobConf) {
15231528
jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
15241529
}
15251530

1531+
private static void propagateVariantShreddingProperty(TableDesc tableDesc, Map<String, String> target) {
1532+
if (tableDesc == null || tableDesc.getProperties() == null || target == null) {
1533+
return;
1534+
}
1535+
String value = tableDesc.getProperties().getProperty(VARIANT_SHREDDING_ENABLED);
1536+
if (value != null) {
1537+
target.put(VARIANT_SHREDDING_ENABLED, value);
1538+
}
1539+
}
1540+
1541+
private static void propagateVariantShreddingProperty(TableDesc tableDesc, JobConf jobConf) {
1542+
if (tableDesc == null || tableDesc.getProperties() == null || jobConf == null) {
1543+
return;
1544+
}
1545+
String value = tableDesc.getProperties().getProperty(VARIANT_SHREDDING_ENABLED);
1546+
if (value != null) {
1547+
jobConf.set(VARIANT_SHREDDING_ENABLED, value);
1548+
}
1549+
}
1550+
15261551
public StorageHandlerTypes getType() {
15271552
return StorageHandlerTypes.ICEBERG;
15281553
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr.hive;
21+
22+
import java.util.List;
23+
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
24+
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
25+
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
26+
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
27+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
28+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTryVariantGet;
29+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFVariantGet;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
final class VariantFilterRewriter {
34+
private static final Logger LOG = LoggerFactory.getLogger(VariantFilterRewriter.class);
35+
36+
private VariantFilterRewriter() {
37+
}
38+
39+
static ExprNodeGenericFuncDesc rewriteForShredding(
40+
ExprNodeGenericFuncDesc predicate, boolean shreddingEnabled) {
41+
if (!shreddingEnabled || predicate == null) {
42+
return predicate;
43+
}
44+
45+
ExprNodeGenericFuncDesc cloned = (ExprNodeGenericFuncDesc) predicate.clone();
46+
ExprNodeDesc rewrittenRoot = rewriteNode(cloned);
47+
if (rewrittenRoot instanceof ExprNodeGenericFuncDesc) {
48+
return (ExprNodeGenericFuncDesc) rewrittenRoot;
49+
}
50+
51+
// If rewrites ended up replacing the root, fall back to the original predicate clone to avoid
52+
// changing the expected root type.
53+
return cloned;
54+
}
55+
56+
private static ExprNodeDesc rewriteNode(ExprNodeDesc node) {
57+
if (node == null) {
58+
return null;
59+
}
60+
61+
if (node instanceof ExprNodeGenericFuncDesc) {
62+
ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) node;
63+
List<ExprNodeDesc> children = funcDesc.getChildren();
64+
if (children != null) {
65+
for (int i = 0; i < children.size(); i++) {
66+
ExprNodeDesc rewrittenChild = rewriteNode(children.get(i));
67+
children.set(i, rewrittenChild);
68+
}
69+
}
70+
71+
if (isVariantGet(funcDesc)) {
72+
ExprNodeDesc replacement = rewriteVariantFunction(funcDesc);
73+
if (replacement != null) {
74+
return replacement;
75+
}
76+
}
77+
}
78+
79+
return node;
80+
}
81+
82+
private static boolean isVariantGet(ExprNodeGenericFuncDesc funcDesc) {
83+
GenericUDF udf = funcDesc.getGenericUDF();
84+
return udf instanceof GenericUDFVariantGet || udf instanceof GenericUDFTryVariantGet;
85+
}
86+
87+
private static ExprNodeDesc rewriteVariantFunction(ExprNodeGenericFuncDesc funcDesc) {
88+
List<ExprNodeDesc> args = funcDesc.getChildren();
89+
if (args == null || args.size() < 2) {
90+
return null;
91+
}
92+
93+
ExprNodeDesc variantColumn = args.get(0);
94+
ExprNodeDesc jsonPathDesc = args.get(1);
95+
96+
if (!(variantColumn instanceof ExprNodeColumnDesc) ||
97+
!(jsonPathDesc instanceof ExprNodeConstantDesc)) {
98+
return null;
99+
}
100+
101+
Object literal = ((ExprNodeConstantDesc) jsonPathDesc).getValue();
102+
if (!(literal instanceof String)) {
103+
return null;
104+
}
105+
106+
String fieldName = extractTopLevelField((String) literal);
107+
if (fieldName == null) {
108+
return null;
109+
}
110+
111+
ExprNodeColumnDesc variantColumnDesc = (ExprNodeColumnDesc) variantColumn;
112+
String shreddedColumn = variantColumnDesc.getColumn() + ".typed_value." + fieldName;
113+
LOG.debug("Rewriting variant predicate to use shredded column {}", shreddedColumn);
114+
return new ExprNodeColumnDesc(
115+
funcDesc.getTypeInfo(),
116+
shreddedColumn,
117+
variantColumnDesc.getTabAlias(),
118+
variantColumnDesc.getIsPartitionColOrVirtualCol());
119+
}
120+
121+
private static String extractTopLevelField(String jsonPath) {
122+
if (jsonPath == null) {
123+
return null;
124+
}
125+
126+
String trimmed = jsonPath.trim();
127+
if (!trimmed.startsWith("$.") || trimmed.length() <= 2) {
128+
return null;
129+
}
130+
131+
String remaining = trimmed.substring(2);
132+
// Only allow top-level field names (no nested objects or arrays)
133+
if (remaining.contains(".") || remaining.contains("[") || remaining.contains("]")) {
134+
return null;
135+
}
136+
137+
return remaining;
138+
}
139+
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
8686
builder.createWriterFunc(GenericParquetWriter::create);
8787
// Configure variant shredding function if conditions are met:
8888
if (hasVariantColumns(dataSchema()) && isVariantShreddingEnabled(properties)) {
89-
builder.variantShreddingFunc(
90-
Parquet.constructVariantShreddingFunction(sampleRecord, dataSchema()));
89+
var shreddingFunction = Parquet.constructVariantShreddingFunction(sampleRecord, dataSchema());
90+
builder.variantShreddingFunc(shreddingFunction);
9191
}
9292
}
9393

@@ -182,7 +182,7 @@ private static boolean isVariantShreddingEnabled(Map<String, String> properties)
182182
* Should be called before the Parquet writer is created.
183183
*/
184184
public void initialize(Record record) {
185-
if (this.sampleRecord != null) {
185+
if (this.sampleRecord == null) {
186186
this.sampleRecord = record;
187187
}
188188
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.iceberg.io.CloseableIterator;
6161
import org.apache.iceberg.io.InputFile;
6262
import org.apache.iceberg.mapping.NameMappingParser;
63+
import org.apache.iceberg.mr.InputFormatConfig;
6364
import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
6465
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
6566
import org.apache.iceberg.orc.ORC;
@@ -225,6 +226,12 @@ private CloseableIterable<T> newParquetIterable(
225226
.caseSensitive(isCaseSensitive())
226227
.split(task.start(), task.length());
227228

229+
Expression variantFilterExpr =
230+
InputFormatConfig.variantFilter(getContext().getConfiguration());
231+
if (variantFilterExpr != null) {
232+
parquetReadBuilder.variantFilter(variantFilterExpr);
233+
}
234+
228235
if (isReuseContainers()) {
229236
parquetReadBuilder.reuseContainers();
230237
}
@@ -290,4 +297,5 @@ private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, Map<Integ
290297

291298
return TypeUtil.selectNot(readSchema, collect);
292299
}
300+
293301
}

0 commit comments

Comments
 (0)