Skip to content

Commit d0f7120

Browse files
committed
HIVE-29354: Shredded Variant PPD
1 parent 7c3325a commit d0f7120

File tree

12 files changed

+719
-22
lines changed

12 files changed

+719
-22
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private InputFormatConfig() {
3939
public static final String SKIP_RESIDUAL_FILTERING = "skip.residual.filtering";
4040
public static final String AS_OF_TIMESTAMP = "iceberg.mr.as.of.time";
4141
public static final String FILTER_EXPRESSION = "iceberg.mr.filter.expression";
42+
public static final String VARIANT_FILTER_EXPRESSION = "iceberg.mr.variant.filter.expression";
4243
public static final String GROUPING_PARTITION_COLUMNS = "iceberg.mr.grouping.partition.columns";
4344
public static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model";
4445
public static final String READ_SCHEMA = "iceberg.mr.read.schema";
@@ -215,4 +216,16 @@ private static Schema schema(Configuration conf, String key) {
215216
return json == null ? null : SchemaParser.fromJson(json);
216217
}
217218

219+
public static Expression variantFilter(Configuration conf) {
220+
String serialized = conf.get(VARIANT_FILTER_EXPRESSION);
221+
return serialized == null ? null : SerializationUtil.deserializeFromBase64(serialized);
222+
}
223+
224+
public static void setVariantFilter(Configuration conf, Expression expression) {
225+
if (expression == null) {
226+
conf.unset(VARIANT_FILTER_EXPRESSION);
227+
} else {
228+
conf.set(VARIANT_FILTER_EXPRESSION, SerializationUtil.serializeToBase64(expression));
229+
}
230+
}
218231
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,15 +117,21 @@ static Expression icebergDataFilterFromHiveConf(Configuration conf) {
117117
* @return Iceberg Filter Expression
118118
*/
119119
static Expression getFilterExpr(Configuration conf, ExprNodeGenericFuncDesc exprNodeDesc) {
120-
if (exprNodeDesc != null) {
121-
SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc);
122-
try {
123-
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
124-
} catch (UnsupportedOperationException e) {
125-
LOG.warn("Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): ", e);
126-
}
120+
if (exprNodeDesc == null) {
121+
InputFormatConfig.setVariantFilter(conf, null);
122+
return null;
127123
}
128-
return null;
124+
125+
Expression icebergFilter = buildFilterExpression(conf, exprNodeDesc, "Iceberg");
126+
127+
if (Boolean.parseBoolean(conf.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED))) {
128+
Expression variantFilter = buildVariantFilter(conf, exprNodeDesc);
129+
InputFormatConfig.setVariantFilter(conf, variantFilter);
130+
} else {
131+
InputFormatConfig.setVariantFilter(conf, null);
132+
}
133+
134+
return icebergFilter;
129135
}
130136

131137
/**
@@ -146,6 +152,28 @@ public static Expression residualForTask(FileScanTask task, Configuration conf)
146152
).residualFor(task.file().partition());
147153
}
148154

155+
private static Expression buildFilterExpression(
156+
Configuration conf, ExprNodeGenericFuncDesc expression, String filterLabel) {
157+
try {
158+
SearchArgument sarg = ConvertAstToSearchArg.create(conf, expression);
159+
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
160+
} catch (UnsupportedOperationException e) {
161+
LOG.warn("Unable to create {} filter, proceeding without it: ", filterLabel, e);
162+
return null;
163+
}
164+
}
165+
166+
private static Expression buildVariantFilter(
167+
Configuration conf, ExprNodeGenericFuncDesc exprNodeDesc) {
168+
VariantFilterRewriter.RewriteResult rewriteResult =
169+
VariantFilterRewriter.rewriteForShredding(exprNodeDesc);
170+
if (rewriteResult.expression() == null || !rewriteResult.hasVariantRewrite()) {
171+
return null;
172+
}
173+
174+
return buildFilterExpression(conf, rewriteResult.expression(), "variant");
175+
}
176+
149177
@Override
150178
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
151179
Expression filter = icebergDataFilterFromHiveConf(job);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Set;
3939
import java.util.UUID;
4040
import java.util.concurrent.ExecutorService;
41+
import java.util.function.BiConsumer;
4142
import java.util.function.Predicate;
4243
import java.util.stream.Collectors;
4344
import java.util.stream.Stream;
@@ -281,6 +282,8 @@ public HiveAuthorizationProvider getAuthorizationProvider() {
281282
return null;
282283
}
283284

285+
private static final String VARIANT_SHREDDING_ENABLED = "variant.shredding.enabled";
286+
284287
@Override
285288
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
286289
overlayTableProperties(conf, tableDesc, map);
@@ -293,6 +296,7 @@ public void configureInputJobProperties(TableDesc tableDesc, Map<String, String>
293296

294297
map.put(ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.varname,
295298
String.valueOf(allowDataFilesWithinTableLocationOnly));
299+
propagateVariantShreddingProperty(tableDesc, map::put);
296300
}
297301

298302
@Override
@@ -311,7 +315,7 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
311315
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
312316
SessionStateUtil.getResource(conf, SessionStateUtil.MISSING_COLUMNS)
313317
.ifPresent(cols -> map.put(SessionStateUtil.MISSING_COLUMNS, String.join(",", (HashSet<String>) cols)));
314-
318+
propagateVariantShreddingProperty(tableDesc, map::put);
315319
}
316320

317321
/**
@@ -356,6 +360,8 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
356360
jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName);
357361
}
358362
}
363+
propagateVariantShreddingProperty(tableDesc, jobConf::set);
364+
359365
try {
360366
if (!jobConf.getBoolean(ConfVars.HIVE_IN_TEST_IDE.varname, false)) {
361367
// For running unit test this won't work as maven surefire CP is different than what we have on a cluster:
@@ -1523,6 +1529,17 @@ private static void setCommonJobConf(JobConf jobConf) {
15231529
jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
15241530
}
15251531

1532+
private static void propagateVariantShreddingProperty(
1533+
TableDesc tableDesc, BiConsumer<String, String> setter) {
1534+
if (tableDesc == null || tableDesc.getProperties() == null || setter == null) {
1535+
return;
1536+
}
1537+
String value = tableDesc.getProperties().getProperty(VARIANT_SHREDDING_ENABLED);
1538+
if (value != null) {
1539+
setter.accept(VARIANT_SHREDDING_ENABLED, value);
1540+
}
1541+
}
1542+
15261543
public StorageHandlerTypes getType() {
15271544
return StorageHandlerTypes.ICEBERG;
15281545
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr.hive;
21+
22+
import java.util.List;
23+
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
24+
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
25+
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
26+
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
27+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
28+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTryVariantGet;
29+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFVariantGet;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
final class VariantFilterRewriter {
34+
private static final Logger LOG = LoggerFactory.getLogger(VariantFilterRewriter.class);
35+
36+
private VariantFilterRewriter() {
37+
}
38+
39+
static RewriteResult rewriteForShredding(ExprNodeGenericFuncDesc predicate) {
40+
if (predicate == null) {
41+
return RewriteResult.none();
42+
}
43+
44+
ExprNodeGenericFuncDesc cloned = (ExprNodeGenericFuncDesc) predicate.clone();
45+
ExprNodeDesc rewrittenRoot = rewriteNode(cloned);
46+
if (rewrittenRoot instanceof ExprNodeGenericFuncDesc) {
47+
return RewriteResult.of((ExprNodeGenericFuncDesc) rewrittenRoot);
48+
}
49+
50+
// If rewrites ended up replacing the root, fall back to the original predicate clone to avoid
51+
// changing the expected root type.
52+
return RewriteResult.of(cloned);
53+
}
54+
55+
private static ExprNodeDesc rewriteNode(ExprNodeDesc node) {
56+
if (node == null) {
57+
return null;
58+
}
59+
60+
if (node instanceof ExprNodeGenericFuncDesc) {
61+
ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) node;
62+
List<ExprNodeDesc> children = funcDesc.getChildren();
63+
if (children != null) {
64+
for (int i = 0; i < children.size(); i++) {
65+
ExprNodeDesc rewrittenChild = rewriteNode(children.get(i));
66+
children.set(i, rewrittenChild);
67+
}
68+
}
69+
70+
if (isVariantGet(funcDesc)) {
71+
ExprNodeDesc replacement = rewriteVariantFunction(funcDesc);
72+
if (replacement != null) {
73+
return replacement;
74+
}
75+
}
76+
}
77+
78+
return node;
79+
}
80+
81+
private static boolean isVariantGet(ExprNodeGenericFuncDesc funcDesc) {
82+
GenericUDF udf = funcDesc.getGenericUDF();
83+
return udf instanceof GenericUDFVariantGet || udf instanceof GenericUDFTryVariantGet;
84+
}
85+
86+
private static ExprNodeDesc rewriteVariantFunction(ExprNodeGenericFuncDesc funcDesc) {
87+
List<ExprNodeDesc> args = funcDesc.getChildren();
88+
if (args == null || args.size() < 2) {
89+
return null;
90+
}
91+
92+
ExprNodeDesc variantColumn = args.get(0);
93+
ExprNodeDesc jsonPathDesc = args.get(1);
94+
95+
if (!(variantColumn instanceof ExprNodeColumnDesc) ||
96+
!(jsonPathDesc instanceof ExprNodeConstantDesc)) {
97+
return null;
98+
}
99+
100+
Object literal = ((ExprNodeConstantDesc) jsonPathDesc).getValue();
101+
if (!(literal instanceof String)) {
102+
return null;
103+
}
104+
105+
String fieldName = extractTopLevelField((String) literal);
106+
if (fieldName == null) {
107+
return null;
108+
}
109+
110+
ExprNodeColumnDesc variantColumnDesc = (ExprNodeColumnDesc) variantColumn;
111+
String shreddedColumn = variantColumnDesc.getColumn() + ".typed_value." + fieldName;
112+
LOG.debug("Rewriting variant predicate to use shredded column {}", shreddedColumn);
113+
return new ExprNodeColumnDesc(
114+
funcDesc.getTypeInfo(),
115+
shreddedColumn,
116+
variantColumnDesc.getTabAlias(),
117+
variantColumnDesc.getIsPartitionColOrVirtualCol());
118+
}
119+
120+
private static String extractTopLevelField(String jsonPath) {
121+
if (jsonPath == null) {
122+
return null;
123+
}
124+
125+
String trimmed = jsonPath.trim();
126+
if (!trimmed.startsWith("$.") || trimmed.length() <= 2) {
127+
return null;
128+
}
129+
130+
String remaining = trimmed.substring(2);
131+
// Only allow top-level field names (no nested objects or arrays)
132+
if (remaining.contains(".") || remaining.contains("[") || remaining.contains("]")) {
133+
return null;
134+
}
135+
136+
return remaining;
137+
}
138+
139+
private static boolean containsShreddedReference(ExprNodeDesc node) {
140+
if (node == null) {
141+
return false;
142+
}
143+
144+
if (node instanceof ExprNodeColumnDesc) {
145+
String column = ((ExprNodeColumnDesc) node).getColumn();
146+
return column != null && column.contains(".typed_value.");
147+
}
148+
149+
List<ExprNodeDesc> children = node.getChildren();
150+
if (children != null) {
151+
for (ExprNodeDesc child : children) {
152+
if (containsShreddedReference(child)) {
153+
return true;
154+
}
155+
}
156+
}
157+
return false;
158+
}
159+
160+
static final class RewriteResult {
161+
private final ExprNodeGenericFuncDesc expression;
162+
private final boolean rewritten;
163+
164+
private RewriteResult(ExprNodeGenericFuncDesc expression, boolean rewritten) {
165+
this.expression = expression;
166+
this.rewritten = rewritten;
167+
}
168+
169+
private static final RewriteResult NONE = new RewriteResult(null, false);
170+
171+
static RewriteResult none() {
172+
return NONE;
173+
}
174+
175+
static RewriteResult of(ExprNodeGenericFuncDesc expression) {
176+
if (expression == null) {
177+
return none();
178+
}
179+
return new RewriteResult(expression, containsShreddedReference(expression));
180+
}
181+
182+
ExprNodeGenericFuncDesc expression() {
183+
return expression;
184+
}
185+
186+
boolean hasVariantRewrite() {
187+
return rewritten;
188+
}
189+
}
190+
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.iceberg.io.CloseableIterator;
6161
import org.apache.iceberg.io.InputFile;
6262
import org.apache.iceberg.mapping.NameMappingParser;
63+
import org.apache.iceberg.mr.InputFormatConfig;
6364
import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
6465
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
6566
import org.apache.iceberg.orc.ORC;
@@ -225,6 +226,12 @@ private CloseableIterable<T> newParquetIterable(
225226
.caseSensitive(isCaseSensitive())
226227
.split(task.start(), task.length());
227228

229+
Expression variantFilterExpr =
230+
InputFormatConfig.variantFilter(getContext().getConfiguration());
231+
if (variantFilterExpr != null) {
232+
parquetReadBuilder.variantFilter(variantFilterExpr);
233+
}
234+
228235
if (isReuseContainers()) {
229236
parquetReadBuilder.reuseContainers();
230237
}
@@ -290,4 +297,5 @@ private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, Map<Integ
290297

291298
return TypeUtil.selectNot(readSchema, collect);
292299
}
300+
293301
}

0 commit comments

Comments
 (0)