diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 9e0c34d620618..4e93a9fdefd84 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; @@ -43,7 +44,7 @@ public HoodieWriteMetadata write(String instantTime, try { // De-dupe/merge if needed I dedupedRecords = - combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, executor.config.getSchema()); + combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, executor.config); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; @@ -69,8 +70,8 @@ protected abstract I tag( I dedupedRecords, HoodieEngineContext context, HoodieTable table); public I combineOnCondition( - boolean condition, I records, int parallelism, HoodieTable table, String schemaString) { - return condition ? deduplicateRecords(records, table, parallelism, schemaString) : records; + boolean condition, I records, int parallelism, HoodieTable table, HoodieWriteConfig writeConfig) { + return condition ? deduplicateRecords(records, table, parallelism, writeConfig) : records; } /** @@ -81,10 +82,10 @@ public I combineOnCondition( * @return Collection of HoodieRecord already be deduplicated */ public I deduplicateRecords( - I records, HoodieTable table, int parallelism, String schemaString) { - return deduplicateRecords(records, table.getIndex(), parallelism, schemaString); + I records, HoodieTable table, int parallelism, HoodieWriteConfig writeConfig) { + return deduplicateRecords(records, table.getIndex(), parallelism, writeConfig); } public abstract I deduplicateRecords( - I records, HoodieIndex index, int parallelism, String schemaString); + I records, HoodieIndex index, int parallelism, HoodieWriteConfig writeConfig); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index 1acde8f2abc4e..d3b9501805f51 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -26,10 +26,10 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; -import java.util.Properties; public class HoodieWriteHelper extends BaseWriteHelper>, HoodieData, HoodieData, R> { @@ -53,10 +53,8 @@ protected HoodieData> tag(HoodieData> dedupedRec @Override public HoodieData> deduplicateRecords( - HoodieData> records, HoodieIndex index, int parallelism, String schemaString) { + HoodieData> records, HoodieIndex index, int parallelism, HoodieWriteConfig writeConfig) { boolean isIndexingGlobal = index.isGlobal(); - Properties properties = new Properties(); - properties.put("schema", schemaString); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -64,7 +62,7 @@ public HoodieData> deduplicateRecords( return Pair.of(key, record); }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") - T reducedData = (T) rec2.getData().preCombine(rec1.getData(), properties); + T reducedData = (T) rec2.getData().preCombine(rec1.getData(), writeConfig.getProps()); HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); return new HoodieAvroRecord<>(reducedKey, reducedData); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index a06230bf03f54..956c179d7f9d2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; @@ -37,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.stream.Collectors; /** @@ -89,7 +89,7 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism, String schemaString) { + List> records, HoodieIndex index, int parallelism, HoodieWriteConfig writeConfig) { // If index used is global, then records are expected to differ in their partitionPath Map>> keyedRecords = records.stream() .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); @@ -98,9 +98,7 @@ public List> deduplicateRecords( final T data1 = rec1.getData(); final T data2 = rec2.getData(); - Properties properties = new Properties(); - properties.put("schema", schemaString); - @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, properties); + @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, writeConfig.getProps()); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java index 3060611f2b9fc..fdf88aa2db671 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/commit/FlinkWriteHelperTest.java @@ -26,6 +26,8 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.PartialUpdateAvroPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -48,6 +50,8 @@ public class FlinkWriteHelperTest { private String preCombineFields = ""; + private HoodieWriteConfig writeConfig; + public static final String SCHEMA = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" @@ -67,12 +71,18 @@ public class FlinkWriteHelperTest { public void setUp() throws Exception { this.preCombineFields = "_ts1:fa;_ts2:fb"; this.avroSchema = new Schema.Parser().parse(SCHEMA); + this.writeConfig = HoodieWriteConfig + .newBuilder() + .withPath(tempFile.getAbsolutePath()) + .withSchema(SCHEMA) + .withPreCombineField(preCombineFields) + .build(); } @Test void deduplicateRecords() throws IOException, InterruptedException { List records = data(); - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.avroSchema.toString()); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeConfig); GenericRecord record = HoodieAvroUtils.bytesToAvro(((PartialUpdateAvroPayload) records.get(0).getData()).recordBytes, this.avroSchema); System.out.println("======================================================================================"); System.out.println("last: " + record); @@ -94,15 +104,13 @@ public List data() throws InterruptedException { row2.put("_ts2", ts); records.add(row1); records.add(row2); - Thread.sleep(1); + //Thread.sleep(1); } return records.stream().map(genericRowData -> { try { - String orderingFieldValText = HoodieAvroUtils.getMultipleNestedFieldVals(genericRowData, - preCombineFields, false).toString(); return new HoodieAvroRecord(new HoodieKey("1", "default"), - new PartialUpdateAvroPayload(genericRowData, orderingFieldValText), HoodieOperation.INSERT); + new PartialUpdateAvroPayload(Option.of(genericRowData)), HoodieOperation.INSERT); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 3f56c41ee5f97..1fdb47765af30 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -103,7 +103,7 @@ public List bulkInsert(List> inputRecords, if (performDedupe) { dedupedRecords = (List>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table, config.getSchema()); + parallelism, table, config); } final List> repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 28a41f3a93366..06b789bb8340a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -100,7 +100,7 @@ public HoodieData bulkInsert(HoodieData> inputRecor if (performDedupe) { dedupedRecords = (HoodieData>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table, config.getSchema()); + parallelism, table, config); } // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 633fa0a205adb..9ea548dbcf541 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.MultipleOrderingVal2ColsInfo; +import org.apache.hudi.common.model.MultiplePartialUpdateUnit; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -498,14 +498,14 @@ public static Object getNestedFieldValAsString(GenericRecord record, String fiel } public static Object getMultipleNestedFieldVals(GenericRecord record, String fieldMappings, boolean consistentLogicalTimestampEnabled) { - MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(fieldMappings); - multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> { + MultiplePartialUpdateUnit multipleOrderingVal2ColsInfo = new MultiplePartialUpdateUnit(fieldMappings); + multipleOrderingVal2ColsInfo.getMultiplePartialUpdateUnits().forEach(orderingVal2ColsInfo -> { Object val = getNestedFieldVal(record, orderingVal2ColsInfo.getOrderingField(), true, consistentLogicalTimestampEnabled); if (Objects.nonNull(val)) { orderingVal2ColsInfo.setOrderingValue(val.toString()); } }); - return multipleOrderingVal2ColsInfo.generateOrderingText(); + return multipleOrderingVal2ColsInfo.toString(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MultipleOrderingVal2ColsInfo.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MultipleOrderingVal2ColsInfo.java deleted file mode 100644 index 9bea5151ffb00..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/MultipleOrderingVal2ColsInfo.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.common.model; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - -/** - * MultipleOrderingVal2ColsInfo - * _ts1=999:name1,price1;_ts2=111:name2,price2 - * _ts1:name1,price1=999;_ts2:name2,price2=111 - */ -public class MultipleOrderingVal2ColsInfo { - private List orderingVal2ColsInfoList = new ArrayList<>(); - - public MultipleOrderingVal2ColsInfo(String multipleOrderingFieldsWithColsText) { - for (String orderingFieldWithColsText : multipleOrderingFieldsWithColsText.split(";")) { - if (orderingFieldWithColsText == null || orderingFieldWithColsText.isEmpty()) { - continue; - } - OrderingVal2ColsInfo orderingVal2ColsInfo = new OrderingVal2ColsInfo(orderingFieldWithColsText); - orderingVal2ColsInfoList.add(orderingVal2ColsInfo); - } - } - - public List getOrderingVal2ColsInfoList() { - return orderingVal2ColsInfoList; - } - - public String generateOrderingText() { - StringBuilder sb = new StringBuilder(); - orderingVal2ColsInfoList.stream().forEach(orderingVal2ColsInfo -> { - sb.append(orderingVal2ColsInfo.orderingField); - sb.append("="); - if (Objects.nonNull(orderingVal2ColsInfo.orderingValue)) { - sb.append(orderingVal2ColsInfo.orderingValue); - } - sb.append(":"); - sb.append(String.join(",", orderingVal2ColsInfo.getColumnNames())); - sb.append(";"); - }); - sb.deleteCharAt(sb.length() - 1); - - return sb.toString(); - } - - public class OrderingVal2ColsInfo { - private String orderingField; - private String orderingValue = ""; - private List columnNames; - - public OrderingVal2ColsInfo(String orderingFieldWithColsText) { - String[] orderInfo2ColsArr = orderingFieldWithColsText.split(":"); - String[] orderingField2Value = orderInfo2ColsArr[0].split("="); - String[] columnArr = orderInfo2ColsArr[1].split(","); - this.orderingField = orderingField2Value[0]; - if (orderingField2Value.length > 1) { - this.orderingValue = orderingField2Value[1]; - } - this.columnNames = Arrays.asList(columnArr); - } - - public String getOrderingField() { - return orderingField; - } - - public String getOrderingValue() { - return orderingValue; - } - - public void setOrderingValue(String value) { - this.orderingValue = value; - } - - public List getColumnNames() { - return columnNames; - } - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MultiplePartialUpdateUnit.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MultiplePartialUpdateUnit.java new file mode 100644 index 0000000000000..7b02251736383 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/MultiplePartialUpdateUnit.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * MultipleOrderingVal2ColsInfo + * _ts1=999:name1,price1;_ts2=111:name2,price2 + * _ts1:name1,price1=999;_ts2:name2,price2=111 + */ +public class MultiplePartialUpdateUnit { + private List multiplePartialUpdateUnits = Collections.EMPTY_LIST; + + public MultiplePartialUpdateUnit(String multipleUpdateUnitText) { + this.multiplePartialUpdateUnits = + Arrays.stream(multipleUpdateUnitText.split(";")).filter(Objects::nonNull) + .map(PartialUpdateUnit::new).collect(Collectors.toList()); + } + + public List getMultiplePartialUpdateUnits() { + return multiplePartialUpdateUnits; + } + + public class PartialUpdateUnit { + private String orderingField; + private String orderingValue = ""; + private List columnNames; + + public PartialUpdateUnit(String partialUpdateUnitText) { + List partialUpdateList = Arrays.asList(partialUpdateUnitText.split(":|,")); + this.orderingField = partialUpdateList.get(0); + this.columnNames = partialUpdateList.subList(1, partialUpdateList.size()); + } + + public String getOrderingField() { + return orderingField; + } + + public String getOrderingValue() { + return orderingValue; + } + + public void setOrderingValue(String value) { + this.orderingValue = value; + } + + public List getColumnNames() { + return columnNames; + } + + @Override + public String toString() { + return String.format("%s=%s:%s", this.orderingField, this.orderingValue, String.join(",", this.columnNames)); + } + } + + @Override + public String toString() { + int len = multiplePartialUpdateUnits.size(); + return len > 1 ? this.multiplePartialUpdateUnits + .stream() + .map(PartialUpdateUnit::toString) + .collect(Collectors.joining(";")) : multiplePartialUpdateUnits.get(0).toString(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index 97815e4e3a22f..250ed49711639 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -18,12 +18,13 @@ package org.apache.hudi.common.model; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.Option; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; - -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import java.io.IOException; import java.util.Map; @@ -45,6 +46,8 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro public static ConcurrentHashMap schemaRepo = new ConcurrentHashMap<>(); + private MultiplePartialUpdateUnit multiplePartialUpdateUnit = null; + public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } @@ -55,7 +58,7 @@ public PartialUpdateAvroPayload(Option record) { @Override public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Properties properties) { - String schemaStringIn = properties.getProperty("schema"); + String schemaStringIn = properties.getProperty("hoodie.avro.schema"); Schema schemaInstance; if (!schemaRepo.containsKey(schemaStringIn)) { schemaInstance = new Schema.Parser().parse(schemaStringIn); @@ -70,11 +73,10 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal try { GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schemaInstance).get(); - Option optValue = combineAndGetUpdateValue(indexedOldValue, schemaInstance, this.orderingVal.toString()); + Option optValue = combineAndGetUpdateValue(indexedOldValue, schemaInstance, properties); // Rebuild ordering value if required - String newOrderingFieldWithColsText = rebuildWithNewOrderingVal((GenericRecord) optValue.get(), this.orderingVal.toString()); if (optValue.isPresent()) { - return new PartialUpdateAvroPayload((GenericRecord) optValue.get(), newOrderingFieldWithColsText); + return new PartialUpdateAvroPayload(Option.of((GenericRecord) optValue.get())); } } catch (Exception ex) { return this; @@ -83,62 +85,35 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal } public Option combineAndGetUpdateValue( - IndexedRecord currentValue, Schema schema, String multipleOrderingFieldsWithCols) throws IOException { + IndexedRecord currentValue, Schema schema, MultiplePartialUpdateUnit multiplePartialUpdateUnit) throws IOException { Option incomingRecord = getInsertValue(schema); if (!incomingRecord.isPresent()) { return Option.empty(); } // Perform a deserialization again to prevent resultRecord from sharing the same reference as recordOption - GenericRecord resultRecord = (GenericRecord) getInsertValue(schema).get(); + GenericRecord resultRecord = (GenericRecord) incomingRecord.get(); Map name2Field = schema.getFields().stream().collect(Collectors.toMap(Schema.Field::name, item -> item)); // multipleOrderingFieldsWithCols = _ts1:name1,price1=999;_ts2:name2,price2=; - MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(multipleOrderingFieldsWithCols); final Boolean[] deleteFlag = new Boolean[1]; deleteFlag[0] = false; - multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> { - String persistOrderingVal = HoodieAvroUtils.getNestedFieldValAsString( - (GenericRecord) currentValue, orderingVal2ColsInfo.getOrderingField(), true, false); - if (persistOrderingVal == null) { - persistOrderingVal = ""; - } - - // No update required - if (persistOrderingVal.isEmpty() && orderingVal2ColsInfo.getOrderingField().isEmpty()) { - return; - } - - // Pick the payload with greatest ordering value as insert record - boolean needUpdatePersistData = false; - try { - if (persistOrderingVal == null || (orderingVal2ColsInfo.getOrderingValue() != null - && persistOrderingVal.compareTo(orderingVal2ColsInfo.getOrderingValue()) <= 0)) { - needUpdatePersistData = true; - } - } catch (NumberFormatException e) { - if (persistOrderingVal.compareTo(orderingVal2ColsInfo.getOrderingValue()) < 0) { - needUpdatePersistData = true; - } - } + multiplePartialUpdateUnit.getMultiplePartialUpdateUnits().forEach(partialUpdateUnit -> { // Initialise the fields of the sub-tables - GenericRecord insertRecord; - if (!needUpdatePersistData) { + GenericRecord insertRecord = resultRecord; + boolean needUseOldRecordToUpdate = needUseOldRecordToUpdate(resultRecord, (GenericRecord) currentValue, partialUpdateUnit); + if (needUseOldRecordToUpdate) { insertRecord = (GenericRecord) currentValue; // resultRecord is already assigned as recordOption - orderingVal2ColsInfo.getColumnNames().stream() + GenericRecord finalInsertRecord = insertRecord; + partialUpdateUnit.getColumnNames().stream() .filter(name2Field::containsKey) - .forEach(fieldName -> resultRecord.put(fieldName, insertRecord.get(fieldName))); - resultRecord.put(orderingVal2ColsInfo.getOrderingField(), Long.parseLong(persistOrderingVal)); - } else { - insertRecord = (GenericRecord) incomingRecord.get(); - orderingVal2ColsInfo.getColumnNames().stream() - .filter(name2Field::containsKey) - .forEach(fieldName -> resultRecord.put(fieldName, insertRecord.get(fieldName))); + .forEach(fieldName -> resultRecord.put(fieldName, finalInsertRecord.get(fieldName))); + String oldOrderingVal = HoodieAvroUtils.getNestedFieldValAsString(finalInsertRecord, partialUpdateUnit.getOrderingField(), true, false); + resultRecord.put(partialUpdateUnit.getOrderingField(), Long.parseLong(oldOrderingVal)); } - // If any of the sub-table records is flagged for deletion, delete entire row if (isDeleteRecord(insertRecord)) { deleteFlag[0] = true; @@ -153,7 +128,14 @@ public Option combineAndGetUpdateValue( @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { - return this.combineAndGetUpdateValue(currentValue, schema, this.orderingVal.toString()); + return this.combineAndGetUpdateValue(currentValue, schema, this.multiplePartialUpdateUnit); + } + + public boolean needUseOldRecordToUpdate(GenericRecord incomingRecord, GenericRecord currentRecord, MultiplePartialUpdateUnit.PartialUpdateUnit partialUpdateUnit) { + String orderingField = partialUpdateUnit.getOrderingField(); + Comparable currentOrderingVal = HoodieAvroUtils.getNestedFieldValAsString(currentRecord, orderingField, true, false); + Comparable incomingOrderingVal = HoodieAvroUtils.getNestedFieldValAsString(incomingRecord, orderingField, true, false); + return Objects.isNull(incomingOrderingVal) || Objects.nonNull(currentOrderingVal) && currentOrderingVal.compareTo(incomingOrderingVal) > 0; } @Override @@ -162,22 +144,10 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue if (!recordOption.isPresent()) { return Option.empty(); } - String orderingFieldWithColsText = rebuildWithNewOrderingVal( - (GenericRecord) recordOption.get(), this.orderingVal.toString()); - return combineAndGetUpdateValue(currentValue, schema, orderingFieldWithColsText); - } - - private static String rebuildWithNewOrderingVal(GenericRecord record, String orderingFieldWithColsText) { - MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(orderingFieldWithColsText); - multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> { - Object orderingVal = record.get(orderingVal2ColsInfo.getOrderingField()); - if (Objects.nonNull(orderingVal)) { - orderingVal2ColsInfo.setOrderingValue(orderingVal.toString()); - } else { - orderingVal2ColsInfo.setOrderingValue("-1"); - } - }); - return multipleOrderingVal2ColsInfo.generateOrderingText(); + if (multiplePartialUpdateUnit == null) { + this.multiplePartialUpdateUnit = new MultiplePartialUpdateUnit(this.orderingVal.toString()); + } + return combineAndGetUpdateValue(currentValue, schema, this.multiplePartialUpdateUnit); } /** @@ -186,4 +156,11 @@ private static String rebuildWithNewOrderingVal(GenericRecord record, String ord public Boolean overwriteField(Object value, Object defaultValue) { return value == null; } + + public static boolean isMultipleOrderFields(String preCombineField) { + if (StringUtils.isNullOrEmpty(preCombineField) && preCombineField.split(":").length > 1) { + return true; + } + return false; + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index ad77e13b46549..097775a1b1c48 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -83,6 +83,19 @@ public class TestHoodieAvroUtils { + "{\"name\": \"non_nullable_field_wo_default\",\"type\": \"string\"}," + "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\", \"default\": \"dummy\"}]}"; + private static String SCHEMA_WITH_MULTIPLE_PARTIAL_UPDATE = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"fa\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"fb\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_ts1\", \"type\": [\"null\", \"long\"]},\n" + + " {\"name\": \"fc\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_ts2\", \"type\": [\"null\", \"long\"]}\n" + + " ]\n" + + "}"; + private static String SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT = "{\"type\": \"record\",\"name\": \"testrec4\",\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"non_pii_col\", \"type\": \"string\"}," @@ -380,4 +393,19 @@ public void testConvertDaysToDate() { int days = HoodieAvroUtils.fromJavaDate(now); assertEquals(now.toLocalDate(), HoodieAvroUtils.toJavaDate(days).toLocalDate()); } + + @Test + public void testGetMultipleNestedFieldVals() { + + GenericRecord genericRecord = new GenericData.Record(new Schema.Parser().parse(SCHEMA_WITH_MULTIPLE_PARTIAL_UPDATE)); + genericRecord.put("id", "jerry"); + genericRecord.put("fa", "fa"); + genericRecord.put("fb", "fb"); + genericRecord.put("_ts1", 0L); + genericRecord.put("fc", "fc"); + genericRecord.put("_ts2", 1L); + String preCombineFields = "_ts1:fa,fb;_ts2:fc"; + Object orderingVals = HoodieAvroUtils.getMultipleNestedFieldVals(genericRecord, preCombineFields, false); + Assertions.assertEquals("_ts1=0:fa,fb;_ts2=1:fc", orderingVals); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index d13205718d57b..adccfcd1fd4a6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -439,7 +439,7 @@ private boolean flushBucket(DataBucket bucket) { ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { HoodieWriteConfig writeConfig = getHoodieClientConfig(config); - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, writeConfig.getSchema()); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, writeConfig); } bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); @@ -475,7 +475,7 @@ private void flushRemaining(boolean endInput) { if (records.size() > 0) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { HoodieWriteConfig writeConfig = getHoodieClientConfig(config); - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, writeConfig.getSchema()); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, writeConfig); } bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index 7df339489261b..c7520abde0f3c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.BaseAvroPayload; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -74,16 +75,10 @@ public static PayloadCreation instance(Configuration conf) throws Exception { } public HoodieRecordPayload createPayload(GenericRecord record) throws Exception { - if (shouldCombine) { + if (shouldCombine && !preCombineField.contains(";")) { ValidationUtils.checkState(preCombineField != null); - Comparable orderingVal; - if (preCombineField.contains(";")) { - // Multi ordering field support - orderingVal = (Comparable) HoodieAvroUtils.getMultipleNestedFieldVals(record, preCombineField, false); - } else { - orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, - preCombineField, false, false); - } + Comparable orderingVal = PartialUpdateAvroPayload.isMultipleOrderFields(this.preCombineField) ? this.preCombineField : (Comparable) HoodieAvroUtils.getNestedFieldVal(record, + preCombineField, false, false); return (HoodieRecordPayload) constructor.newInstance(record, orderingVal); } else { return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/FlinkPartialUpdateMOR01.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/FlinkPartialUpdateMOR01.java new file mode 100644 index 0000000000000..4781cb0bb1e4c --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/FlinkPartialUpdateMOR01.java @@ -0,0 +1,165 @@ +package org.apache.hudi.sink; + +import org.apache.commons.math3.random.RandomDataGenerator; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.api.StatementSet; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.hudi.common.model.HoodiePayloadProps; +import org.apache.hudi.common.model.PartialUpdateAvroPayload; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.table.marker.SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy; +import org.apache.log4j.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; + +public final class FlinkPartialUpdateMOR01 { + private static final Logger LOG = LoggerFactory.getLogger(FlinkPartialUpdateMOR01.class); + + private static final String sourceTable1 = "source_1"; + private static final String sinkAliasTable1 = "sink_1"; + + private static final String dbName = "hudi_test"; + private final static String targetTable = "hudi_partial_updata_05"; + private static final String warehouse = "hdfs://127.0.0.1:9000/hudi/hudi_db"; + private static final String basePath = warehouse + "/" + dbName + "/" + targetTable; + private static final String metastoreUrl = "thrift://localhost:9083"; + + private FlinkPartialUpdateMOR01() { + } + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + env.setParallelism(1); + env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); + + Configuration configuration = tableEnv.getConfig().getConfiguration(); + configuration.setString("table.dynamic-table-options.enabled", "true"); + + DataStream> dataStream1 = + env.addSource(new StudentDataFunction1(1, 20000)); + + Table inputTable1 = tableEnv.fromDataStream(dataStream1, "uuid, name, ts"); + + tableEnv.createTemporaryView(sourceTable1, inputTable1); + + LOG.info("sinkTableDDL1 ddl: {}", sinkTableDDL1()); + tableEnv.executeSql(sinkTableDDL1()); + + StatementSet statementSet = tableEnv.createStatementSet(); + statementSet.addInsertSql(String.format("insert into %s(uuid, name, _ts1)\n " + + "select uuid, name, ts as _ts1 from %s \n", + sinkAliasTable1, sourceTable1)); +// statementSet.addInsertSql(String.format("insert into %s(uuid, name, _ts1)\n " +// + "select uuid, name, TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')) as _ts1 from %s \n", +// sinkAliasTable1, sourceTable1)); + + statementSet.execute(); + } + + public static String sinkTableDDL1() { + return String.format("create table %s(\n" + + " uuid STRING,\n" + + " name STRING,\n" + + " age int,\n" +// + " _ts1 timestamp(3),\n" +// + " _ts2 timestamp(3),\n" + + " _ts1 bigint,\n" + + " _ts2 bigint,\n" + + " PRIMARY KEY(uuid) NOT ENFORCED" + + ")\n" + + " PARTITIONED BY (_ts1)\n" + + " with (\n" + + " 'connector' = 'hudi',\n" + + " 'path' = '%s', -- 替换成的绝对路径\n" + + " 'table.type' = 'MERGE_ON_READ',\n" + + " 'write.bucket_assign.tasks' = '6',\n" + + " 'write.tasks' = '3',\n" + + " 'write.partition.format' = 'yyyyMMdd',\n" + + " 'write.partition.timestamp.type' = 'EPOCHMILLISECONDS',\n" + + " 'hoodie.bucket.index.num.buckets' = '5',\n" + + " 'changelog.enabled' = 'true',\n" + + " 'index.type' = 'BUCKET',\n" + + " 'hoodie.bucket.index.num.buckets' = '5',\n" + + String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:name;_ts2:age") + + " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n" + + " 'hoodie.write.log.suffix' = 'job1',\n" + + " 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n" + + " 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n" + + " 'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n" + + " 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n" + + " 'hoodie.consistency.check.enabled' = 'false',\n" + + " 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n" + + " 'hoodie.write.lock.early.conflict.detection.strategy' = '" + + SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n" +// + " 'hoodie.logfile.data.block.max.size' = '40',\n" + + " 'hoodie.keep.min.commits' = '1440',\n" + + " 'hoodie.keep.max.commits' = '2880',\n" + + " 'compaction.schedule.enabled'='false',\n" + + " 'compaction.async.enabled'='false',\n" + + " 'compaction.trigger.strategy'='num_or_time',\n" + + " 'compaction.delta_commits' ='5',\n" + + " 'compaction.delta_seconds' ='180',\n" + + " 'compaction.max_memory' = '3096',\n" + + " 'clean.async.enabled' ='false',\n" + + " 'hoodie.metrics.on' = 'false',\n" + + " 'hive_sync.enable' = 'false',\n" + + " 'hive_sync.mode' = 'hms',\n" + + " 'hive_sync.db' = '%s',\n" + + " 'hive_sync.table' = '%s',\n" + + " 'hive_sync.metastore.uris' = '%s'\n" + + ")", sinkAliasTable1, basePath, dbName, targetTable, metastoreUrl); + } + + public static class StudentDataFunction1 + extends RichSourceFunction> { + private volatile boolean cancelled; + RandomDataGenerator generator = new RandomDataGenerator(); + private int idStart; + private final int idEnd; + + private StudentDataFunction1() { + this.idStart = 0; + this.idEnd = Integer.MAX_VALUE; + } + + private StudentDataFunction1(int idStart, int idEnd) { + this.idStart = idStart; + this.idEnd = idEnd; + } + + @Override + public void run(SourceContext> sourceContext) throws InterruptedException { + while (!cancelled) { + if (idStart <= idEnd) { + String uuid = String.valueOf(idStart); + String name = generator.nextHexString(10); + Long ts = System.currentTimeMillis(); + + Tuple3 row = Tuple3.of(uuid, name, ts); + sourceContext.collect(row); + + idStart++; + + Thread.sleep(1); + } + } + } + + @Override + public void cancel() { + cancelled = true; + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/FlinkPartialUpdateMOR02.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/FlinkPartialUpdateMOR02.java new file mode 100644 index 0000000000000..886eb5b3a8832 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/FlinkPartialUpdateMOR02.java @@ -0,0 +1,160 @@ +package org.apache.hudi.sink; + +import org.apache.commons.math3.random.RandomDataGenerator; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.api.StatementSet; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.hudi.common.model.HoodiePayloadProps; +import org.apache.hudi.common.model.PartialUpdateAvroPayload; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.table.marker.SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class FlinkPartialUpdateMOR02 { + private static final Logger LOG = LoggerFactory.getLogger(FlinkPartialUpdateMOR02.class); + + private static final String sourceTable2 = "source_2"; + private static final String sinkAliasTable2 = "sink_2"; + + private static final String dbName = "hudi_test"; + private final static String targetTable = "hudi_partial_updata_05"; + private static final String warehouse = "hdfs://127.0.0.1:9000/hudi/hudi_db"; + private static final String basePath = warehouse + "/" + dbName + "/" + targetTable; + private static final String metastoreUrl = "thrift://localhost:9083"; + + private FlinkPartialUpdateMOR02() { + } + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + env.setParallelism(1); + env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); + + Configuration configuration = tableEnv.getConfig().getConfiguration(); + configuration.setString("table.dynamic-table-options.enabled", "true"); + + DataStream> dataStream2 = + env.addSource(new StudentDataFunction2(1, 20000)); + + Table inputTable2 = tableEnv.fromDataStream(dataStream2, "uuid, age, ts"); + + tableEnv.createTemporaryView(sourceTable2, inputTable2); + + + LOG.info("sinkTableDDL2 ddl: {}", sinkTableDDL2()); + tableEnv.executeSql(sinkTableDDL2()); + + StatementSet statementSet = tableEnv.createStatementSet(); + statementSet.addInsertSql(String.format("insert into %s(uuid, age, _ts2)\n " + + "select uuid, age, ts as _ts2 from %s \n", + sinkAliasTable2, sourceTable2)); +// statementSet.addInsertSql(String.format("insert into %s(uuid, age, _ts2)\n " +// + "select uuid, age, TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')) as _ts2 from %s \n", +// sinkAliasTable2, sourceTable2)); + + statementSet.execute(); + } + + public static String sinkTableDDL2() { + return String.format("create table %s(\n" + + " uuid STRING,\n" + + " name STRING,\n" + + " age int,\n" +// + " _ts1 timestamp(3),\n" +// + " _ts2 timestamp(3),\n" + + " _ts1 bigint,\n" + + " _ts2 bigint,\n" + + " PRIMARY KEY(uuid) NOT ENFORCED" + + ")\n" + + " PARTITIONED BY (_ts2)\n" + + " with (\n" + + " 'connector' = 'hudi',\n" + + " 'path' = '%s', -- 替换成的绝对路径\n" + + " 'table.type' = 'MERGE_ON_READ',\n" + + " 'write.bucket_assign.tasks' = '3',\n" + + " 'write.tasks' = '6',\n" + + " 'write.partition.format' = 'yyyyMMdd',\n" + + " 'write.partition.timestamp.type' = 'EPOCHMILLISECONDS',\n" + + " 'changelog.enabled' = 'true',\n" + + " 'index.type' = 'BUCKET',\n" + + " 'hoodie.bucket.index.num.buckets' = '5',\n" + + String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:name;_ts2:age") + + " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n" + + " 'hoodie.write.log.suffix' = 'job2',\n" + + " 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n" + + " 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n" + + " 'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n" + + " 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n" + + " 'hoodie.consistency.check.enabled' = 'false',\n" + + " 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n" + + " 'hoodie.write.lock.early.conflict.detection.strategy' = '" + + SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n" +// + " 'hoodie.logfile.data.block.max.size' = '40',\n" + + " 'hoodie.keep.min.commits' = '1440',\n" + + " 'hoodie.keep.max.commits' = '2880',\n" + + " 'compaction.schedule.enabled'='true',\n" + + " 'compaction.async.enabled'='true',\n" + + " 'compaction.trigger.strategy'='num_or_time',\n" + + " 'compaction.delta_commits' ='5',\n" + + " 'compaction.delta_seconds' ='180',\n" + + " 'compaction.max_memory' = '3096',\n" + + " 'clean.async.enabled' ='false',\n" + + " 'hoodie.metrics.on' = 'false',\n" + + " 'hive_sync.enable' = 'false',\n" + + " 'hive_sync.mode' = 'hms',\n" + + " 'hive_sync.db' = '%s',\n" + + " 'hive_sync.table' = '%s',\n" + + " 'hive_sync.metastore.uris' = '%s'\n" + + ")", sinkAliasTable2, basePath, dbName, targetTable, metastoreUrl); + } + + public static class StudentDataFunction2 + extends RichSourceFunction> { + private volatile boolean cancelled; + RandomDataGenerator generator = new RandomDataGenerator(); + private int idStart; + private final int idEnd; + + private StudentDataFunction2() { + this.idStart = 0; + this.idEnd = Integer.MAX_VALUE; + } + + private StudentDataFunction2(int idStart, int idEnd) { + this.idStart = idStart; + this.idEnd = idEnd; + } + + @Override + public void run(SourceContext> sourceContext) throws InterruptedException { + while (!cancelled) { + if (idStart <= idEnd) { + String uuid = String.valueOf(idStart); + Integer age = generator.nextInt(1, 100); + Long ts = System.currentTimeMillis(); + + Tuple3 row = Tuple3.of(uuid, age, ts); + sourceContext.collect(row); + + idStart++; + + Thread.sleep(1); + } + } + } + + @Override + public void cancel() { + cancelled = true; + } + } +}