Skip to content

Commit 07e57d8

Browse files
jerryyueXuQianJin-Stars
authored andcommitted
optimize multiple partial update code
1 parent 2ebd741 commit 07e57d8

File tree

27 files changed

+1188
-397
lines changed

27 files changed

+1188
-397
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.apache.hudi.cli;
2+
3+
import java.util.Arrays;
4+
import java.util.List;
5+
6+
public class Test {
7+
public static void main(String[] args) {
8+
String str1 = "fa=:a,b,c";
9+
String str2 = "fa:a,b,c";
10+
String str3 = "fa=1:a,";
11+
System.out.println(Arrays.asList(str3.split("=|:|,")));
12+
13+
}
14+
public List<String> split(String str) {
15+
return null;
16+
}
17+
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
4343
try {
4444
// De-dupe/merge if needed
4545
I dedupedRecords =
46-
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, executor.config.getSchema());
46+
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
4747

4848
Instant lookupBegin = Instant.now();
4949
I taggedRecords = dedupedRecords;
@@ -69,8 +69,8 @@ protected abstract I tag(
6969
I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table);
7070

7171
public I combineOnCondition(
72-
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table, String schemaString) {
73-
return condition ? deduplicateRecords(records, table, parallelism, schemaString) : records;
72+
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
73+
return condition ? deduplicateRecords(records, table, parallelism) : records;
7474
}
7575

7676
/**
@@ -81,8 +81,8 @@ public I combineOnCondition(
8181
* @return Collection of HoodieRecord already be deduplicated
8282
*/
8383
public I deduplicateRecords(
84-
I records, HoodieTable<T, I, K, O> table, int parallelism, String schemaString) {
85-
return deduplicateRecords(records, table.getIndex(), parallelism, schemaString);
84+
I records, HoodieTable<T, I, K, O> table, int parallelism) {
85+
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema());
8686
}
8787

8888
public abstract I deduplicateRecords(

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.table.action.commit;
2020

2121
import org.apache.hudi.client.WriteStatus;
22+
import org.apache.hudi.common.config.SerializableSchema;
2223
import org.apache.hudi.common.data.HoodieData;
2324
import org.apache.hudi.common.engine.HoodieEngineContext;
2425
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -55,16 +56,15 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
5556
public HoodieData<HoodieRecord<T>> deduplicateRecords(
5657
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaString) {
5758
boolean isIndexingGlobal = index.isGlobal();
58-
Properties properties = new Properties();
59-
properties.put("schema", schemaString);
59+
final SerializableSchema schema = new SerializableSchema(schemaString);
6060
return records.mapToPair(record -> {
6161
HoodieKey hoodieKey = record.getKey();
6262
// If index used is global, then records are expected to differ in their partitionPath
6363
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
6464
return Pair.of(key, record);
6565
}).reduceByKey((rec1, rec2) -> {
6666
@SuppressWarnings("unchecked")
67-
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), properties);
67+
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties());
6868
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
6969

7070
return new HoodieAvroRecord<>(reducedKey, reducedData);

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.table.action.commit;
2020

21+
import org.apache.avro.Schema;
2122
import org.apache.hudi.client.WriteStatus;
2223
import org.apache.hudi.common.data.HoodieListData;
2324
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -98,9 +99,8 @@ public List<HoodieRecord<T>> deduplicateRecords(
9899
final T data1 = rec1.getData();
99100
final T data2 = rec2.getData();
100101

101-
Properties properties = new Properties();
102-
properties.put("schema", schemaString);
103-
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, properties);
102+
final Schema schema = new Schema.Parser().parse(schemaString);
103+
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties());
104104
// we cannot allow the user to change the key or partitionPath, since that will affect
105105
// everything
106106
// so pick it from one of the records.

hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java

Lines changed: 0 additions & 112 deletions
This file was deleted.

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.hudi.common.model.HoodieOperation;
2323
import org.apache.hudi.common.model.HoodieRecord;
2424
import org.apache.hudi.common.model.HoodieRecordPayload;
25-
import org.apache.hudi.common.model.MultipleOrderingVal2ColsInfo;
2625
import org.apache.hudi.common.util.Option;
2726
import org.apache.hudi.common.util.StringUtils;
2827
import org.apache.hudi.common.util.collection.Pair;
@@ -497,17 +496,6 @@ public static Object getNestedFieldValAsString(GenericRecord record, String fiel
497496
return obj == null ? "" : StringUtils.objToString(obj);
498497
}
499498

500-
public static Object getMultipleNestedFieldVals(GenericRecord record, String fieldMappings, boolean consistentLogicalTimestampEnabled) {
501-
MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(fieldMappings);
502-
multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> {
503-
Object val = getNestedFieldVal(record, orderingVal2ColsInfo.getOrderingField(), true, consistentLogicalTimestampEnabled);
504-
if (Objects.nonNull(val)) {
505-
orderingVal2ColsInfo.setOrderingValue(val.toString());
506-
}
507-
});
508-
return multipleOrderingVal2ColsInfo.generateOrderingText();
509-
}
510-
511499
/**
512500
* Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
513501
*/

hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ public SerializableSchema() {
3838
public SerializableSchema(Schema schema) {
3939
this.schema = newCopy(schema);
4040
}
41+
42+
public SerializableSchema(String schemaStr) {
43+
this.schema = new Schema.Parser().parse(schemaStr);
44+
}
4145

4246
public SerializableSchema(SerializableSchema serializableSchema) {
4347
this(serializableSchema.schema);

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) {
5858
return preCombine(oldValue);
5959
}
6060

61+
/**
62+
* When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema.
63+
* Implementation can leverage the schema to decide their business logic to do preCombine.
64+
*
65+
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
66+
* @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value.
67+
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
68+
* @return the combined value
69+
*/
70+
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
71+
default T preCombine(T oldValue, Schema schema, Properties properties) {
72+
return preCombine(oldValue, properties);
73+
}
74+
6175
/**
6276
* This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
6377
*/

hudi-common/src/main/java/org/apache/hudi/common/model/MultipleOrderingVal2ColsInfo.java

Lines changed: 0 additions & 97 deletions
This file was deleted.

0 commit comments

Comments
 (0)