Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,10 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
this.txnManager.beginTransaction(Option.of(inflightInstant),
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
// already within lock, and so no lock requried for commit.
metadata.setNeedLock(false);
preCommit(inflightInstant, metadata);
commit(table, commitActionType, instantTime, metadata, stats);
// already within lock, and so no lock requried for archival
postCommit(table, metadata, instantTime, extraMetadata, false);
LOG.info("Committed " + instantTime);
releaseResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
try {
// De-dupe/merge if needed
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, executor.config.getSchema());
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);

Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
Expand All @@ -69,8 +69,8 @@ protected abstract I tag(
I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table);

public I combineOnCondition(
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table, String schemaString) {
return condition ? deduplicateRecords(records, table, parallelism, schemaString) : records;
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
return condition ? deduplicateRecords(records, table, parallelism) : records;
}

/**
Expand All @@ -81,8 +81,8 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism, String schemaString) {
return deduplicateRecords(records, table.getIndex(), parallelism, schemaString);
I records, HoodieTable<T, I, K, O> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema());
}

public abstract I deduplicateRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand Down Expand Up @@ -55,16 +56,15 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaString) {
boolean isIndexingGlobal = index.isGlobal();
Properties properties = new Properties();
properties.put("schema", schemaString);
final SerializableSchema schema = new SerializableSchema(schemaString);
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), properties);
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties());
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

return new HoodieAvroRecord<>(reducedKey, reducedData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,10 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String
initMetadataWriter();
}
try {
// guard the metadata writer with concurrent lock
this.txnManager.getLockManager().lock();

if (metadata.getNeedLock()) {
// guard the metadata writer with concurrent lock
this.txnManager.getLockManager().lock();
}
// refresh the timeline

// Note: the data meta client is not refreshed currently, some code path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.commit;

import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -98,9 +99,8 @@ public List<HoodieRecord<T>> deduplicateRecords(
final T data1 = rec1.getData();
final T data2 = rec2.getData();

Properties properties = new Properties();
properties.put("schema", schemaString);
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, properties);
final Schema schema = new Schema.Parser().parse(schemaString);
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,

if (performDedupe) {
dedupedRecords = (List<HoodieRecord<T>>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
parallelism, table, config.getSchema());
parallelism, table);
}

final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecor

if (performDedupe) {
dedupedRecords = (HoodieData<HoodieRecord<T>>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
parallelism, table, config.getSchema());
parallelism, table);
}

// only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
Expand Down
38 changes: 12 additions & 26 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@

package org.apache.hudi.avro;

import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.MultipleOrderingVal2ColsInfo;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversions;
import org.apache.avro.Conversions.DecimalConversion;
Expand All @@ -54,6 +42,16 @@
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -70,13 +68,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Objects;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -497,17 +494,6 @@ public static Object getNestedFieldValAsString(GenericRecord record, String fiel
return obj == null ? "" : StringUtils.objToString(obj);
}

public static Object getMultipleNestedFieldVals(GenericRecord record, String fieldMappings, boolean consistentLogicalTimestampEnabled) {
MultipleOrderingVal2ColsInfo multipleOrderingVal2ColsInfo = new MultipleOrderingVal2ColsInfo(fieldMappings);
multipleOrderingVal2ColsInfo.getOrderingVal2ColsInfoList().forEach(orderingVal2ColsInfo -> {
Object val = getNestedFieldVal(record, orderingVal2ColsInfo.getOrderingField(), true, consistentLogicalTimestampEnabled);
if (Objects.nonNull(val)) {
orderingVal2ColsInfo.setOrderingValue(val.toString());
}
});
return multipleOrderingVal2ColsInfo.generateOrderingText();
}

/**
* Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public SerializableSchema() {
public SerializableSchema(Schema schema) {
this.schema = newCopy(schema);
}

public SerializableSchema(String schemaStr) {
this.schema = new Schema.Parser().parse(schemaStr);
}

public SerializableSchema(SerializableSchema serializableSchema) {
this(serializableSchema.schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class HoodieCommitMetadata implements Serializable {
protected Map<String, List<HoodieWriteStat>> partitionToWriteStats;
protected Boolean compacted;

protected Boolean isNeedLock = true;

protected Map<String, String> extraMetadata;

protected WriteOperationType operationType = WriteOperationType.UNKNOWN;
Expand Down Expand Up @@ -106,6 +108,14 @@ public void setCompacted(Boolean compacted) {
this.compacted = compacted;
}

public Boolean getNeedLock() {
return isNeedLock;
}

public void setNeedLock(Boolean needLock) {
isNeedLock = needLock;
}

public HashMap<String, String> getFileIdAndRelativePaths() {
HashMap<String, String> filePaths = new HashMap<>();
// list all partitions paths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) {
return preCombine(oldValue);
}

/**
* When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema.
* Implementation can leverage the schema to decide their business logic to do preCombine.
*
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
* @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value.
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
* @return the combined value
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
default T preCombine(T oldValue, Schema schema, Properties properties) {
return preCombine(oldValue, properties);
}

/**
* This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
*/
Expand Down
Loading