Skip to content

Commit 64adbf8

Browse files
author
jerryyue
committed
optimize multiple partial update code
1 parent cef2e8b commit 64adbf8

File tree

23 files changed

+760
-285
lines changed

23 files changed

+760
-285
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
4343
try {
4444
// De-dupe/merge if needed
4545
I dedupedRecords =
46-
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, executor.config.getSchema());
46+
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
4747

4848
Instant lookupBegin = Instant.now();
4949
I taggedRecords = dedupedRecords;
@@ -69,8 +69,8 @@ protected abstract I tag(
6969
I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table);
7070

7171
public I combineOnCondition(
72-
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table, String schemaString) {
73-
return condition ? deduplicateRecords(records, table, parallelism, schemaString) : records;
72+
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
73+
return condition ? deduplicateRecords(records, table, parallelism) : records;
7474
}
7575

7676
/**
@@ -81,8 +81,8 @@ public I combineOnCondition(
8181
* @return Collection of HoodieRecord already be deduplicated
8282
*/
8383
public I deduplicateRecords(
84-
I records, HoodieTable<T, I, K, O> table, int parallelism, String schemaString) {
85-
return deduplicateRecords(records, table.getIndex(), parallelism, schemaString);
84+
I records, HoodieTable<T, I, K, O> table, int parallelism) {
85+
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema());
8686
}
8787

8888
public abstract I deduplicateRecords(

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.table.action.commit;
2020

2121
import org.apache.hudi.client.WriteStatus;
22+
import org.apache.hudi.common.config.SerializableSchema;
2223
import org.apache.hudi.common.data.HoodieData;
2324
import org.apache.hudi.common.engine.HoodieEngineContext;
2425
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -55,16 +56,15 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
5556
public HoodieData<HoodieRecord<T>> deduplicateRecords(
5657
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaString) {
5758
boolean isIndexingGlobal = index.isGlobal();
58-
Properties properties = new Properties();
59-
properties.put("schema", schemaString);
59+
final SerializableSchema schema = new SerializableSchema(schemaString);
6060
return records.mapToPair(record -> {
6161
HoodieKey hoodieKey = record.getKey();
6262
// If index used is global, then records are expected to differ in their partitionPath
6363
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
6464
return Pair.of(key, record);
6565
}).reduceByKey((rec1, rec2) -> {
6666
@SuppressWarnings("unchecked")
67-
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), properties);
67+
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties());
6868
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
6969

7070
return new HoodieAvroRecord<>(reducedKey, reducedData);

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.table.action.commit;
2020

21+
import org.apache.avro.Schema;
2122
import org.apache.hudi.client.WriteStatus;
2223
import org.apache.hudi.common.data.HoodieListData;
2324
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -98,9 +99,8 @@ public List<HoodieRecord<T>> deduplicateRecords(
9899
final T data1 = rec1.getData();
99100
final T data2 = rec2.getData();
100101

101-
Properties properties = new Properties();
102-
properties.put("schema", schemaString);
103-
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, properties);
102+
final Schema schema = new Schema.Parser().parse(schemaString);
103+
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties());
104104
// we cannot allow the user to change the key or partitionPath, since that will affect
105105
// everything
106106
// so pick it from one of the records.

hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java

Lines changed: 0 additions & 112 deletions
This file was deleted.

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.hudi.common.model.HoodieOperation;
2323
import org.apache.hudi.common.model.HoodieRecord;
2424
import org.apache.hudi.common.model.HoodieRecordPayload;
25-
import org.apache.hudi.common.model.MultiplePartialUpdateUnit;
2625
import org.apache.hudi.common.util.Option;
2726
import org.apache.hudi.common.util.StringUtils;
2827
import org.apache.hudi.common.util.collection.Pair;
@@ -497,17 +496,6 @@ public static Object getNestedFieldValAsString(GenericRecord record, String fiel
497496
return obj == null ? "" : StringUtils.objToString(obj);
498497
}
499498

500-
public static Object getMultipleNestedFieldVals(GenericRecord record, String fieldMappings, boolean consistentLogicalTimestampEnabled) {
501-
MultiplePartialUpdateUnit multipleOrderingVal2ColsInfo = new MultiplePartialUpdateUnit(fieldMappings);
502-
multipleOrderingVal2ColsInfo.getMultiplePartialUpdateUnits().forEach(orderingVal2ColsInfo -> {
503-
Object val = getNestedFieldVal(record, orderingVal2ColsInfo.getOrderingField(), true, consistentLogicalTimestampEnabled);
504-
if (Objects.nonNull(val)) {
505-
orderingVal2ColsInfo.setOrderingValue(val.toString());
506-
}
507-
});
508-
return multipleOrderingVal2ColsInfo.toString();
509-
}
510-
511499
/**
512500
* Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
513501
*/

hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ public SerializableSchema() {
3838
public SerializableSchema(Schema schema) {
3939
this.schema = newCopy(schema);
4040
}
41+
42+
public SerializableSchema(String schemaStr) {
43+
this.schema = new Schema.Parser().parse(schemaStr);
44+
}
4145

4246
public SerializableSchema(SerializableSchema serializableSchema) {
4347
this(serializableSchema.schema);

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) {
5858
return preCombine(oldValue);
5959
}
6060

61+
/**
62+
* When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema.
63+
* Implementation can leverage the schema to decide their business logic to do preCombine.
64+
*
65+
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
66+
* @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value.
67+
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
68+
* @return the combined value
69+
*/
70+
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
71+
default T preCombine(T oldValue, Schema schema, Properties properties) {
72+
return preCombine(oldValue, properties);
73+
}
74+
6175
/**
6276
* This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
6377
*/

hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.avro.generic.GenericRecord;
2323
import org.apache.avro.generic.GenericRecordBuilder;
2424
import org.apache.avro.generic.IndexedRecord;
25-
2625
import org.apache.hudi.common.util.Option;
2726

2827
import java.io.IOException;
@@ -58,26 +57,43 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
5857
GenericRecord insertRecord = (GenericRecord) recordOption.get();
5958
GenericRecord currentRecord = (GenericRecord) currentValue;
6059

61-
return getMergedIndexedRecordOption(schema, insertRecord, currentRecord);
60+
return mergeRecords(schema, insertRecord, currentRecord);
6261
}
6362

64-
protected Option<IndexedRecord> getMergedIndexedRecordOption(Schema schema, GenericRecord insertRecord, GenericRecord currentRecord) {
65-
if (isDeleteRecord(insertRecord)) {
63+
/**
64+
* Merges the given records into one.
65+
* The fields in {@code baseRecord} has higher priority:
66+
* it is set up into the merged record if it is not null or equals to the default.
67+
*
68+
* @param schema The record schema
69+
* @param baseRecord The base record to merge with
70+
* @param mergedRecord The record to be merged
71+
*
72+
* @return the merged record option
73+
*/
74+
protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) {
75+
if (isDeleteRecord(baseRecord)) {
6676
return Option.empty();
6777
} else {
6878
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
6979
List<Schema.Field> fields = schema.getFields();
70-
fields.forEach(field -> {
71-
Object value = insertRecord.get(field.name());
72-
value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value;
73-
Object defaultValue = field.defaultVal();
74-
if (!overwriteField(value, defaultValue)) {
75-
builder.set(field, value);
76-
} else {
77-
builder.set(field, currentRecord.get(field.pos()));
78-
}
79-
});
80+
fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field));
8081
return Option.of(builder.build());
8182
}
8283
}
84+
85+
protected void setField(
86+
GenericRecord baseRecord,
87+
GenericRecord mergedRecord,
88+
GenericRecordBuilder builder,
89+
Schema.Field field) {
90+
Object value = baseRecord.get(field.name());
91+
value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value;
92+
Object defaultValue = field.defaultVal();
93+
if (!overwriteField(value, defaultValue)) {
94+
builder.set(field, value);
95+
} else {
96+
builder.set(field, mergedRecord.get(field.name()));
97+
}
98+
}
8399
}

0 commit comments

Comments
 (0)