Skip to content

Commit 71ec538

Browse files
committed
Make the code a bit more pretty
1 parent 44086d8 commit 71ec538

File tree

16 files changed

+952
-1199
lines changed

16 files changed

+952
-1199
lines changed

core/src/main/java/org/apache/iceberg/avro/Avro.java

+254-246
Large diffs are not rendered by default.

core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@
3636
import org.slf4j.LoggerFactory;
3737

3838
/**
39-
* Registry which maintains the available {@link ReadBuilder}s and {@link WriterService}
40-
* implementations. Based on the `file format`, the required `data type` and the reader/writer
41-
* `builderType` the registry returns the correct reader and writer service implementations. These
42-
* services could be used to generate the correct reader and writer builders.
39+
* Registry which maintains the available {@link ReadBuilder}s and {@link WriteBuilder}s. Based on
40+
* the `file format`, the required `data type` and the reader/writer `builderType` the registry
41+
* returns the correct reader and writer builders. These services could be used to generate the
42+
* correct readers and writers.
4343
*/
4444
public final class DataFileServiceRegistry {
4545
private static final Logger LOG = LoggerFactory.getLogger(DataFileServiceRegistry.class);
46+
// The list of classes which are used for registering the reader and writer builders
4647
private static final String[] CLASSES_TO_REGISTER =
4748
new String[] {
4849
"org.apache.iceberg.parquet.Parquet",
@@ -58,6 +59,7 @@ public final class DataFileServiceRegistry {
5859
private static final Map<Key, Function<InputFile, ReadBuilder<?, ?>>> READ_BUILDERS =
5960
Maps.newConcurrentMap();
6061

62+
/** Registers a new writer builder for the given format/input type. */
6163
public static void registerWrite(
6264
FileFormat format,
6365
String inputType,
@@ -73,11 +75,13 @@ public static void registerWrite(
7375
WRITE_BUILDERS.putIfAbsent(key, writeBuilder);
7476
}
7577

78+
/** Registers a new reader builder for the given format/input type. */
7679
public static void registerRead(
7780
FileFormat format, String outputType, Function<InputFile, ReadBuilder<?, ?>> readBuilder) {
7881
registerRead(format, outputType, null, readBuilder);
7982
}
8083

84+
/** Registers a new reader builder for the given format/input type/reader type. */
8185
public static void registerRead(
8286
FileFormat format,
8387
String outputType,
@@ -107,8 +111,10 @@ private static void registerSupportedFormats() {
107111
Record.class.getName(),
108112
outputFile ->
109113
new Avro.AvroDataWriteBuilder<Record, Schema>(outputFile)
110-
.writerFunction((nativeType, schema) -> DataWriter.create(schema)));
114+
.writerFunction((nativeType, schema) -> DataWriter.create(schema))
115+
.positionDeleteWriterFunction((nativeType, schema) -> DataWriter.create(schema)));
111116

117+
// Uses dynamic methods to call the `register` for the listed classes
112118
for (String s : CLASSES_TO_REGISTER) {
113119
try {
114120
DynMethods.StaticMethod register =

core/src/main/java/org/apache/iceberg/io/datafile/ReadBuilder.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
* Builder API for reading Iceberg data files.
3030
*
3131
* @param <D> The records returned by the reader
32-
* @param <F> The records accepted by the {@link DeleteFilter}
32+
* @param <F> The records accepted by the {@link DeleteFilter}. Different from `D` for vectorized
33+
* readers.
3334
*/
3435
public interface ReadBuilder<D, F> {
3536
/**
@@ -98,13 +99,16 @@ default ReadBuilder<D, F> withDeleteFilter(DeleteFilter<F> newDeleteFilter) {
9899
/** Sets a mapping from external schema names to Iceberg type IDs. */
99100
ReadBuilder<D, F> withNameMapping(NameMapping newNameMapping);
100101

102+
/** Sets the file encryption key used for reading the file. */
101103
default ReadBuilder<D, F> withFileEncryptionKey(ByteBuffer encryptionKey) {
102104
throw new UnsupportedOperationException("Not supported");
103105
}
104106

107+
/** Sets the additional authentication data prefix for encryption. */
105108
default ReadBuilder<D, F> withAADPrefix(ByteBuffer aadPrefix) {
106109
throw new UnsupportedOperationException("Not supported");
107110
}
108111

112+
/** Builds the reader. */
109113
CloseableIterable<D> build();
110114
}

core/src/main/java/org/apache/iceberg/io/datafile/WriteBuilder.java

+18-9
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,6 @@ public interface WriteBuilder<D, T> {
4747
/** Sets the file metadata kep/value pairs for the writer which should be written to the file. */
4848
WriteBuilder<D, T> meta(String property, String value);
4949

50-
default WriteBuilder<D, T> meta(Map<String, String> properties) {
51-
properties.forEach(this::meta);
52-
return this;
53-
}
54-
5550
/** Enables overwriting previously created files. */
5651
WriteBuilder<D, T> overwrite();
5752

@@ -63,12 +58,16 @@ default WriteBuilder<D, T> meta(Map<String, String> properties) {
6358
*/
6459
WriteBuilder<D, T> metricsConfig(MetricsConfig newMetricsConfig);
6560

61+
/** Sets the partition specification for the generated {@link org.apache.iceberg.ContentFile}. */
6662
WriteBuilder<D, T> withSpec(PartitionSpec newSpec);
6763

64+
/** Sets the partition value for the generated {@link org.apache.iceberg.ContentFile}. */
6865
WriteBuilder<D, T> withPartition(StructLike newPartition);
6966

67+
/** Sets the encryption key metadata for the generated {@link org.apache.iceberg.ContentFile}. */
7068
WriteBuilder<D, T> withKeyMetadata(EncryptionKeyMetadata metadata);
7169

70+
/** Sets the sort order for the generated {@link org.apache.iceberg.ContentFile}. */
7271
WriteBuilder<D, T> withSortOrder(SortOrder newSortOrder);
7372

7473
/** The target data file schema. */
@@ -79,10 +78,12 @@ default WriteBuilder<D, T> meta(Map<String, String> properties) {
7978
*/
8079
WriteBuilder<D, T> rowSchema(Schema schema);
8180

81+
/** Writes the file with the given encryption key */
8282
default WriteBuilder<D, T> withFileEncryptionKey(ByteBuffer fileEncryptionKey) {
8383
throw new UnsupportedOperationException("Not supported");
8484
}
8585

86+
/** Writes the AAP prefix to the the generated {@link org.apache.iceberg.ContentFile}. */
8687
default WriteBuilder<D, T> withAADPrefix(ByteBuffer aadPrefix) {
8788
throw new UnsupportedOperationException("Not supported");
8889
}
@@ -92,13 +93,21 @@ default WriteBuilder<D, T> withAADPrefix(ByteBuffer aadPrefix) {
9293
/** Sets the equality field ids which are used in the delete file. */
9394
WriteBuilder<D, T> equalityFieldIds(int... fieldIds);
9495

96+
/**
97+
* Sets the engine specific data type for the writer. Used for conversion by the engine specific
98+
* writers.
99+
*/
95100
WriteBuilder<D, T> nativeType(T nativeType);
96101

97-
FileAppender<D> appenderBuilder() throws IOException;
102+
/** Creates an appender. */
103+
FileAppender<D> appender() throws IOException;
98104

99-
DataWriter<D> writerBuilder() throws IOException;
105+
/** Creates a data writer. */
106+
DataWriter<D> dataWriter() throws IOException;
100107

101-
EqualityDeleteWriter<D> equalityWriterBuilder() throws IOException;
108+
/** Creates an equality delete writer. */
109+
EqualityDeleteWriter<D> equalityDeleteWriter() throws IOException;
102110

103-
PositionDeleteWriter<D> positionWriterBuilder() throws IOException;
111+
/** Creates a position delete writer. */
112+
PositionDeleteWriter<D> positionDeleteWriter() throws IOException;
104113
}

data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public FileAppender<Record> newAppender(
9494
.setAll(config)
9595
.metricsConfig(metricsConfig)
9696
.overwrite()
97-
.appenderBuilder();
97+
.appender();
9898
} catch (IOException e) {
9999
throw new UncheckedIOException(e);
100100
}
@@ -136,7 +136,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
136136
.withSpec(spec)
137137
.withKeyMetadata(file.keyMetadata())
138138
.equalityFieldIds(equalityFieldIds)
139-
.equalityWriterBuilder();
139+
.equalityDeleteWriter();
140140
} catch (IOException e) {
141141
throw new UncheckedIOException(e);
142142
}
@@ -158,7 +158,7 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
158158
.rowSchema(posDeleteRowSchema)
159159
.withSpec(spec)
160160
.withKeyMetadata(file.keyMetadata())
161-
.positionWriterBuilder();
161+
.positionDeleteWriter();
162162
} catch (IOException e) {
163163
throw new UncheckedIOException(e);
164164
}

data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java

+9-21
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
import org.apache.iceberg.SortOrder;
2929
import org.apache.iceberg.Table;
3030
import org.apache.iceberg.avro.Avro;
31-
import org.apache.iceberg.data.avro.DataWriter;
32-
import org.apache.iceberg.data.orc.GenericOrcWriter;
33-
import org.apache.iceberg.data.parquet.GenericParquetWriter;
3431
import org.apache.iceberg.orc.ORC;
3532
import org.apache.iceberg.parquet.Parquet;
3633
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -69,49 +66,40 @@ static Builder builderFor(Table table) {
6966
return new Builder(table);
7067
}
7168

72-
@Override
7369
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
74-
builder.createWriterFunc(DataWriter::create);
70+
throw new UnsupportedOperationException("Deprecated");
7571
}
7672

77-
@Override
7873
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
79-
builder.createWriterFunc(DataWriter::create);
74+
throw new UnsupportedOperationException("Deprecated");
8075
}
8176

82-
@Override
8377
protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
84-
builder.createWriterFunc(DataWriter::create);
78+
throw new UnsupportedOperationException("Deprecated");
8579
}
8680

87-
@Override
8881
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
89-
builder.createWriterFunc(GenericParquetWriter::buildWriter);
82+
throw new UnsupportedOperationException("Deprecated");
9083
}
9184

92-
@Override
9385
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
94-
builder.createWriterFunc(GenericParquetWriter::buildWriter);
86+
throw new UnsupportedOperationException("Deprecated");
9587
}
9688

97-
@Override
9889
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
99-
builder.createWriterFunc(GenericParquetWriter::buildWriter);
90+
throw new UnsupportedOperationException("Deprecated");
10091
}
10192

102-
@Override
10393
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
104-
builder.createWriterFunc(GenericOrcWriter::buildWriter);
94+
throw new UnsupportedOperationException("Deprecated");
10595
}
10696

107-
@Override
10897
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
109-
builder.createWriterFunc(GenericOrcWriter::buildWriter);
98+
throw new UnsupportedOperationException("Deprecated");
11099
}
111100

112-
@Override
113101
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
114-
builder.createWriterFunc(GenericOrcWriter::buildWriter);
102+
throw new UnsupportedOperationException("Deprecated");
115103
}
116104

117105
static class Builder {

data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java

+3-33
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,13 @@
2828
import org.apache.iceberg.SortOrder;
2929
import org.apache.iceberg.StructLike;
3030
import org.apache.iceberg.Table;
31-
import org.apache.iceberg.avro.Avro;
3231
import org.apache.iceberg.deletes.EqualityDeleteWriter;
3332
import org.apache.iceberg.deletes.PositionDeleteWriter;
3433
import org.apache.iceberg.encryption.EncryptedOutputFile;
3534
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
3635
import org.apache.iceberg.io.DataWriter;
3736
import org.apache.iceberg.io.FileWriterFactory;
3837
import org.apache.iceberg.io.datafile.DataFileServiceRegistry;
39-
import org.apache.iceberg.orc.ORC;
40-
import org.apache.iceberg.parquet.Parquet;
4138
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4239

4340
/**
@@ -93,33 +90,6 @@ protected RegistryBasedFileWriterFactory(
9390
this.positionalDeleteSchemaType = positionalDeleteSchemaType;
9491
}
9592

96-
@Deprecated
97-
protected abstract void configureDataWrite(Avro.DataWriteBuilder builder);
98-
99-
@Deprecated
100-
protected abstract void configureEqualityDelete(Avro.DeleteWriteBuilder builder);
101-
102-
@Deprecated
103-
protected abstract void configurePositionDelete(Avro.DeleteWriteBuilder builder);
104-
105-
@Deprecated
106-
protected abstract void configureDataWrite(Parquet.DataWriteBuilder builder);
107-
108-
@Deprecated
109-
protected abstract void configureEqualityDelete(Parquet.DeleteWriteBuilder builder);
110-
111-
@Deprecated
112-
protected abstract void configurePositionDelete(Parquet.DeleteWriteBuilder builder);
113-
114-
@Deprecated
115-
protected abstract void configureDataWrite(ORC.DataWriteBuilder builder);
116-
117-
@Deprecated
118-
protected abstract void configureEqualityDelete(ORC.DeleteWriteBuilder builder);
119-
120-
@Deprecated
121-
protected abstract void configurePositionDelete(ORC.DeleteWriteBuilder builder);
122-
12393
protected S rowSchemaType() {
12494
return rowSchemaType;
12595
}
@@ -151,7 +121,7 @@ public DataWriter<T> newDataWriter(
151121
.withKeyMetadata(keyMetadata)
152122
.withSortOrder(dataSortOrder)
153123
.overwrite()
154-
.writerBuilder();
124+
.dataWriter();
155125
} catch (IOException e) {
156126
throw new UncheckedIOException(e);
157127
}
@@ -177,7 +147,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
177147
.withKeyMetadata(keyMetadata)
178148
.withSortOrder(equalityDeleteSortOrder)
179149
.overwrite()
180-
.equalityWriterBuilder();
150+
.equalityDeleteWriter();
181151
} catch (IOException e) {
182152
throw new UncheckedIOException("Failed to create new equality delete writer", e);
183153
}
@@ -201,7 +171,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(
201171
.withPartition(partition)
202172
.withKeyMetadata(keyMetadata)
203173
.overwrite()
204-
.positionWriterBuilder();
174+
.positionDeleteWriter();
205175
} catch (IOException e) {
206176
throw new UncheckedIOException("Failed to create new position delete writer", e);
207177
}

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat forma
145145
.schema(schema)
146146
.metricsConfig(metricsConfig)
147147
.overwrite()
148-
.appenderBuilder();
148+
.appender();
149149
} catch (IOException e) {
150150
throw new UncheckedIOException(e);
151151
}
@@ -186,7 +186,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
186186
.withSpec(spec)
187187
.withKeyMetadata(outputFile.keyMetadata())
188188
.equalityFieldIds(equalityFieldIds)
189-
.equalityWriterBuilder();
189+
.equalityDeleteWriter();
190190
} catch (IOException e) {
191191
throw new UncheckedIOException(e);
192192
}
@@ -207,7 +207,7 @@ public PositionDeleteWriter<RowData> newPosDeleteWriter(
207207
.rowSchema(posDeleteRowSchema)
208208
.withSpec(spec)
209209
.withKeyMetadata(outputFile.keyMetadata())
210-
.positionWriterBuilder();
210+
.positionDeleteWriter();
211211
} catch (IOException e) {
212212
throw new UncheckedIOException(e);
213213
}

0 commit comments

Comments
 (0)