Skip to content

Commit bb50ab9

Browse files
authored
Core: Support Avro file encryption with AES GCM streams (#9436)
1 parent 13d2160 commit bb50ab9

File tree

4 files changed

+288
-9
lines changed

4 files changed

+288
-9
lines changed

Diff for: core/src/main/java/org/apache/iceberg/avro/Avro.java

-9
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ public static WriteBuilder write(OutputFile file) {
9494
}
9595

9696
public static WriteBuilder write(EncryptedOutputFile file) {
97-
Preconditions.checkState(
98-
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
99-
"Avro encryption is not supported");
10097
return new WriteBuilder(file.encryptingOutputFile());
10198
}
10299

@@ -282,9 +279,6 @@ public static DataWriteBuilder writeData(OutputFile file) {
282279
}
283280

284281
public static DataWriteBuilder writeData(EncryptedOutputFile file) {
285-
Preconditions.checkState(
286-
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
287-
"Avro encryption is not supported");
288282
return new DataWriteBuilder(file.encryptingOutputFile());
289283
}
290284

@@ -385,9 +379,6 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
385379
}
386380

387381
public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
388-
Preconditions.checkState(
389-
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
390-
"Avro encryption is not supported");
391382
return new DeleteWriteBuilder(file.encryptingOutputFile());
392383
}
393384

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.avro;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import java.io.IOException;
24+
import java.nio.file.Path;
25+
import java.util.List;
26+
import java.util.UUID;
27+
import org.apache.iceberg.Files;
28+
import org.apache.iceberg.MetadataColumns;
29+
import org.apache.iceberg.Schema;
30+
import org.apache.iceberg.TableProperties;
31+
import org.apache.iceberg.data.GenericRecord;
32+
import org.apache.iceberg.data.Record;
33+
import org.apache.iceberg.data.avro.DataReader;
34+
import org.apache.iceberg.data.avro.DataWriter;
35+
import org.apache.iceberg.encryption.EncryptedFiles;
36+
import org.apache.iceberg.encryption.EncryptedInputFile;
37+
import org.apache.iceberg.encryption.EncryptedOutputFile;
38+
import org.apache.iceberg.encryption.EncryptionManager;
39+
import org.apache.iceberg.encryption.EncryptionTestHelpers;
40+
import org.apache.iceberg.io.FileAppender;
41+
import org.apache.iceberg.io.InputFile;
42+
import org.apache.iceberg.io.OutputFile;
43+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
44+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
45+
import org.apache.iceberg.types.Types;
46+
import org.apache.iceberg.types.Types.NestedField;
47+
import org.junit.jupiter.api.BeforeEach;
48+
import org.junit.jupiter.api.Test;
49+
import org.junit.jupiter.api.io.TempDir;
50+
51+
public class TestEncryptedAvroFileSplit {
52+
private static final Schema SCHEMA =
53+
new Schema(
54+
NestedField.required(1, "id", Types.LongType.get()),
55+
NestedField.required(2, "data", Types.StringType.get()));
56+
57+
private static final EncryptionManager ENCRYPTION_MANAGER =
58+
EncryptionTestHelpers.createEncryptionManager();
59+
60+
private static final int NUM_RECORDS = 100_000;
61+
62+
@TempDir Path temp;
63+
64+
public List<Record> expected = null;
65+
public InputFile file = null;
66+
67+
@BeforeEach
68+
public void writeDataFile() throws IOException {
69+
this.expected = Lists.newArrayList();
70+
71+
OutputFile out = Files.localOutput(temp.toFile());
72+
73+
EncryptedOutputFile eOut = ENCRYPTION_MANAGER.encrypt(out);
74+
75+
try (FileAppender<Object> writer =
76+
Avro.write(eOut)
77+
.set(TableProperties.AVRO_COMPRESSION, "uncompressed")
78+
.createWriterFunc(DataWriter::create)
79+
.schema(SCHEMA)
80+
.overwrite()
81+
.build()) {
82+
83+
Record record = GenericRecord.create(SCHEMA);
84+
for (long i = 0; i < NUM_RECORDS; i += 1) {
85+
Record next = record.copy(ImmutableMap.of("id", i, "data", UUID.randomUUID().toString()));
86+
expected.add(next);
87+
writer.add(next);
88+
}
89+
}
90+
91+
EncryptedInputFile encryptedIn =
92+
EncryptedFiles.encryptedInput(out.toInputFile(), eOut.keyMetadata());
93+
94+
this.file = ENCRYPTION_MANAGER.decrypt(encryptedIn);
95+
}
96+
97+
@Test
98+
public void testSplitDataSkipping() throws IOException {
99+
long end = file.getLength();
100+
long splitLocation = end / 2;
101+
102+
List<Record> firstHalf = readAvro(file, SCHEMA, 0, splitLocation);
103+
assertThat(firstHalf.size()).as("First split should not be empty").isNotEqualTo(0);
104+
105+
List<Record> secondHalf = readAvro(file, SCHEMA, splitLocation + 1, end - splitLocation - 1);
106+
assertThat(secondHalf.size()).as("Second split should not be empty").isNotEqualTo(0);
107+
108+
assertThat(firstHalf.size() + secondHalf.size())
109+
.as("Total records should match expected")
110+
.isEqualTo(expected.size());
111+
112+
for (int i = 0; i < firstHalf.size(); i += 1) {
113+
assertThat(firstHalf.get(i)).isEqualTo(expected.get(i));
114+
}
115+
116+
for (int i = 0; i < secondHalf.size(); i += 1) {
117+
assertThat(secondHalf.get(i)).isEqualTo(expected.get(firstHalf.size() + i));
118+
}
119+
}
120+
121+
@Test
122+
public void testPosField() throws IOException {
123+
Schema projection =
124+
new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION, SCHEMA.columns().get(1));
125+
126+
List<Record> records = readAvro(file, projection, 0, file.getLength());
127+
128+
for (int i = 0; i < expected.size(); i += 1) {
129+
assertThat(records.get(i).getField(MetadataColumns.ROW_POSITION.name()))
130+
.as("Field _pos should match")
131+
.isEqualTo((long) i);
132+
133+
assertThat(records.get(i).getField("id"))
134+
.as("Field id should match")
135+
.isEqualTo(expected.get(i).getField("id"));
136+
137+
assertThat(records.get(i).getField("data"))
138+
.as("Field data should match")
139+
.isEqualTo(expected.get(i).getField("data"));
140+
}
141+
}
142+
143+
@Test
144+
public void testPosFieldWithSplits() throws IOException {
145+
Schema projection =
146+
new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION, SCHEMA.columns().get(1));
147+
148+
long end = file.getLength();
149+
long splitLocation = end / 2;
150+
151+
List<Record> secondHalf =
152+
readAvro(file, projection, splitLocation + 1, end - splitLocation - 1);
153+
assertThat(secondHalf.size()).as("Second split should not be empty").isNotEqualTo(0);
154+
155+
List<Record> firstHalf = readAvro(file, projection, 0, splitLocation);
156+
assertThat(firstHalf.size()).as("First split should not be empty").isNotEqualTo(0);
157+
158+
assertThat(firstHalf.size() + secondHalf.size())
159+
.as("Total records should match expected")
160+
.isEqualTo(expected.size());
161+
162+
for (int i = 0; i < firstHalf.size(); i += 1) {
163+
assertThat(firstHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()))
164+
.as("Field _pos should match")
165+
.isEqualTo((long) i);
166+
assertThat(firstHalf.get(i).getField("id"))
167+
.as("Field id should match")
168+
.isEqualTo(expected.get(i).getField("id"));
169+
assertThat(firstHalf.get(i).getField("data"))
170+
.as("Field data should match")
171+
.isEqualTo(expected.get(i).getField("data"));
172+
}
173+
174+
for (int i = 0; i < secondHalf.size(); i += 1) {
175+
assertThat(secondHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()))
176+
.as("Field _pos should match")
177+
.isEqualTo((long) (firstHalf.size() + i));
178+
assertThat(secondHalf.get(i).getField("id"))
179+
.as("Field id should match")
180+
.isEqualTo(expected.get(firstHalf.size() + i).getField("id"));
181+
assertThat(secondHalf.get(i).getField("data"))
182+
.as("Field data should match")
183+
.isEqualTo(expected.get(firstHalf.size() + i).getField("data"));
184+
}
185+
}
186+
187+
@Test
188+
public void testPosWithEOFSplit() throws IOException {
189+
Schema projection =
190+
new Schema(SCHEMA.columns().get(0), MetadataColumns.ROW_POSITION, SCHEMA.columns().get(1));
191+
192+
long end = file.getLength();
193+
194+
List<Record> records = readAvro(file, projection, end - 10, 10);
195+
assertThat(records.size()).as("Should not read any records").isEqualTo(0);
196+
}
197+
198+
public List<Record> readAvro(InputFile in, Schema projection, long start, long length)
199+
throws IOException {
200+
try (AvroIterable<Record> reader =
201+
Avro.read(in)
202+
.createReaderFunc(DataReader::create)
203+
.split(start, length)
204+
.project(projection)
205+
.build()) {
206+
return Lists.newArrayList(reader);
207+
}
208+
}
209+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.encryption;
20+
21+
import java.util.Map;
22+
import org.apache.iceberg.CatalogProperties;
23+
import org.apache.iceberg.TableProperties;
24+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
25+
26+
public class EncryptionTestHelpers {
27+
28+
private EncryptionTestHelpers() {}
29+
30+
public static EncryptionManager createEncryptionManager() {
31+
Map<String, String> catalogProperties = Maps.newHashMap();
32+
catalogProperties.put(
33+
CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName());
34+
Map<String, String> tableProperties = Maps.newHashMap();
35+
tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME1);
36+
tableProperties.put(TableProperties.FORMAT_VERSION, "2");
37+
38+
return EncryptionUtil.createEncryptionManager(
39+
tableProperties, EncryptionUtil.createKmsClient(catalogProperties));
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.encryption;
20+
21+
import java.nio.charset.StandardCharsets;
22+
import java.util.Map;
23+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
24+
25+
public class UnitestKMS extends MemoryMockKMS {
26+
public static final String MASTER_KEY_NAME1 = "keyA";
27+
public static final byte[] MASTER_KEY1 = "0123456789012345".getBytes(StandardCharsets.UTF_8);
28+
public static final String MASTER_KEY_NAME2 = "keyB";
29+
public static final byte[] MASTER_KEY2 = "1123456789012345".getBytes(StandardCharsets.UTF_8);
30+
31+
@Override
32+
public void initialize(Map<String, String> properties) {
33+
masterKeys =
34+
ImmutableMap.of(
35+
MASTER_KEY_NAME1, MASTER_KEY1,
36+
MASTER_KEY_NAME2, MASTER_KEY2);
37+
}
38+
}

0 commit comments

Comments
 (0)