Skip to content

Commit 1a265fc

Browse files
committed
Add Parquet decryption support
1 parent 3a759e1 commit 1a265fc

File tree

57 files changed

+2527
-101
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2527
-101
lines changed

Diff for: lib/trino-parquet/pom.xml

+12
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@
100100
</exclusions>
101101
</dependency>
102102

103+
<dependency>
104+
<groupId>io.trino</groupId>
105+
<artifactId>trino-filesystem</artifactId>
106+
<scope>provided</scope>
107+
</dependency>
108+
103109
<dependency>
104110
<groupId>io.trino</groupId>
105111
<artifactId>trino-spi</artifactId>
@@ -192,6 +198,12 @@
192198
<scope>test</scope>
193199
</dependency>
194200

201+
<dependency>
202+
<groupId>org.apache.parquet</groupId>
203+
<artifactId>parquet-hadoop</artifactId>
204+
<scope>test</scope>
205+
</dependency>
206+
195207
<dependency>
196208
<groupId>org.assertj</groupId>
197209
<artifactId>assertj-core</artifactId>

Diff for: lib/trino-parquet/src/main/java/io/trino/parquet/DataPage.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ public abstract sealed class DataPage
2121
{
2222
protected final int valueCount;
2323
private final OptionalLong firstRowIndex;
24+
private final int pageIndex;
2425

25-
public DataPage(int uncompressedSize, int valueCount, OptionalLong firstRowIndex)
26+
public DataPage(int uncompressedSize, int valueCount, OptionalLong firstRowIndex, int pageIndex)
2627
{
2728
super(uncompressedSize);
2829
this.valueCount = valueCount;
2930
this.firstRowIndex = firstRowIndex;
31+
this.pageIndex = pageIndex;
3032
}
3133

3234
/**
@@ -41,4 +43,9 @@ public int getValueCount()
4143
{
4244
return valueCount;
4345
}
46+
47+
public int getPageIndex()
48+
{
49+
return pageIndex;
50+
}
4451
}

Diff for: lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV1.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,17 @@ public DataPageV1(
3535
OptionalLong firstRowIndex,
3636
ParquetEncoding repetitionLevelEncoding,
3737
ParquetEncoding definitionLevelEncoding,
38-
ParquetEncoding valuesEncoding)
38+
ParquetEncoding valuesEncoding,
39+
int pageIndex)
3940
{
40-
super(uncompressedSize, valueCount, firstRowIndex);
41+
super(uncompressedSize, valueCount, firstRowIndex, pageIndex);
4142
this.slice = requireNonNull(slice, "slice is null");
4243
this.repetitionLevelEncoding = repetitionLevelEncoding;
4344
this.definitionLevelEncoding = definitionLevelEncoding;
4445
this.valuesEncoding = valuesEncoding;
4546
}
4647

48+
@Override
4749
public Slice getSlice()
4850
{
4951
return slice;

Diff for: lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV2.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ public DataPageV2(
4444
int uncompressedSize,
4545
OptionalLong firstRowIndex,
4646
Statistics<?> statistics,
47-
boolean isCompressed)
47+
boolean isCompressed,
48+
int pageIndex)
4849
{
49-
super(uncompressedSize, valueCount, firstRowIndex);
50+
super(uncompressedSize, valueCount, firstRowIndex, pageIndex);
5051
this.rowCount = rowCount;
5152
this.nullCount = nullCount;
5253
this.repetitionLevels = requireNonNull(repetitionLevels, "repetitionLevels slice is null");
@@ -82,6 +83,7 @@ public ParquetEncoding getDataEncoding()
8283
return dataEncoding;
8384
}
8485

86+
@Override
8587
public Slice getSlice()
8688
{
8789
return slice;

Diff for: lib/trino-parquet/src/main/java/io/trino/parquet/DictionaryPage.java

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public DictionaryPage(Slice slice, int uncompressedSize, int dictionarySize, Par
4343
encoding);
4444
}
4545

46+
@Override
4647
public Slice getSlice()
4748
{
4849
return slice;

Diff for: lib/trino-parquet/src/main/java/io/trino/parquet/Page.java

+4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414
package io.trino.parquet;
1515

16+
import io.airlift.slice.Slice;
17+
1618
public abstract class Page
1719
{
1820
protected final int uncompressedSize;
@@ -26,4 +28,6 @@ public int getUncompressedSize()
2628
{
2729
return uncompressedSize;
2830
}
31+
32+
public abstract Slice getSlice();
2933
}

Diff for: lib/trino-parquet/src/main/java/io/trino/parquet/ParquetValidationUtils.java

+9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.parquet;
1515

1616
import com.google.errorprone.annotations.FormatMethod;
17+
import io.trino.parquet.crypto.ParquetCryptoException;
1718

1819
public final class ParquetValidationUtils
1920
{
@@ -27,4 +28,12 @@ public static void validateParquet(boolean condition, ParquetDataSourceId dataSo
2728
throw new ParquetCorruptionException(dataSourceId, formatString, args);
2829
}
2930
}
31+
32+
@FormatMethod
33+
public static void validateParquetCrypto(boolean condition, ParquetDataSourceId dataSourceId, String formatString, Object... args)
34+
{
35+
if (!condition) {
36+
throw new ParquetCryptoException(dataSourceId, formatString, args);
37+
}
38+
}
3039
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.parquet.crypto;
15+
16+
import javax.crypto.Cipher;
17+
import javax.crypto.spec.SecretKeySpec;
18+
19+
import java.security.SecureRandom;
20+
21+
import static java.util.Objects.requireNonNull;
22+
23+
public class AesCipher
24+
{
25+
public static final int NONCE_LENGTH = 12;
26+
public static final int GCM_TAG_LENGTH = 16;
27+
protected static final int CTR_IV_LENGTH = 16;
28+
protected static final int GCM_TAG_LENGTH_BITS = 8 * GCM_TAG_LENGTH;
29+
protected static final int CHUNK_LENGTH = 4 * 1024;
30+
protected static final int SIZE_LENGTH = 4;
31+
// NIST SP 800-38D section 8.3 specifies limit on AES GCM encryption operations with same key and random IV/nonce
32+
protected static final long GCM_RANDOM_IV_SAME_KEY_MAX_OPS = 1L << 32;
33+
// NIST SP 800-38A doesn't specify limit on AES CTR operations.
34+
// However, Parquet uses a random IV (with 12-byte random nonce). To avoid repetition due to "birthday problem",
35+
// setting a conservative limit equal to GCM's value for random IVs
36+
protected static final long CTR_RANDOM_IV_SAME_KEY_MAX_OPS = GCM_RANDOM_IV_SAME_KEY_MAX_OPS;
37+
static final int AAD_FILE_UNIQUE_LENGTH = 8;
38+
protected final SecureRandom randomGenerator;
39+
protected final byte[] localNonce;
40+
protected SecretKeySpec aesKey;
41+
protected Cipher cipher;
42+
43+
AesCipher(byte[] keyBytes)
44+
{
45+
requireNonNull(keyBytes, "key bytes cannot be null");
46+
boolean allZeroKey = true;
47+
for (byte kb : keyBytes) {
48+
if (kb != 0) {
49+
allZeroKey = false;
50+
break;
51+
}
52+
}
53+
54+
if (allZeroKey) {
55+
throw new IllegalArgumentException("All key bytes are zero");
56+
}
57+
58+
aesKey = new SecretKeySpec(keyBytes, "AES");
59+
randomGenerator = new SecureRandom();
60+
localNonce = new byte[NONCE_LENGTH];
61+
}
62+
63+
public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, int rowGroupOrdinal, int columnOrdinal, int pageOrdinal)
64+
{
65+
byte[] typeOrdinalBytes = new byte[1];
66+
typeOrdinalBytes[0] = moduleType.getValue();
67+
68+
if (ModuleType.Footer == moduleType) {
69+
return concatByteArrays(fileAAD, typeOrdinalBytes);
70+
}
71+
72+
if (rowGroupOrdinal < 0) {
73+
throw new IllegalArgumentException("Wrong row group ordinal: " + rowGroupOrdinal);
74+
}
75+
short shortRGOrdinal = (short) rowGroupOrdinal;
76+
if (shortRGOrdinal != rowGroupOrdinal) {
77+
throw new ParquetCryptoException("Encrypted parquet files can't have more than %s row groups: %s", Short.MAX_VALUE, rowGroupOrdinal);
78+
}
79+
byte[] rowGroupOrdinalBytes = shortToBytesLE(shortRGOrdinal);
80+
81+
if (columnOrdinal < 0) {
82+
throw new IllegalArgumentException("Wrong column ordinal: " + columnOrdinal);
83+
}
84+
short shortColumOrdinal = (short) columnOrdinal;
85+
if (shortColumOrdinal != columnOrdinal) {
86+
throw new ParquetCryptoException("Encrypted parquet files can't have more than %s columns: %s", Short.MAX_VALUE, columnOrdinal);
87+
}
88+
byte[] columnOrdinalBytes = shortToBytesLE(shortColumOrdinal);
89+
90+
if (ModuleType.DataPage != moduleType && ModuleType.DataPageHeader != moduleType) {
91+
return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes);
92+
}
93+
94+
if (pageOrdinal < 0) {
95+
throw new IllegalArgumentException("Wrong page ordinal: " + pageOrdinal);
96+
}
97+
short shortPageOrdinal = (short) pageOrdinal;
98+
if (shortPageOrdinal != pageOrdinal) {
99+
throw new ParquetCryptoException("Encrypted parquet files can't have more than %s pages per chunk: %s", Short.MAX_VALUE, pageOrdinal);
100+
}
101+
byte[] pageOrdinalBytes = shortToBytesLE(shortPageOrdinal);
102+
103+
return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes, pageOrdinalBytes);
104+
}
105+
106+
public static byte[] createFooterAAD(byte[] aadPrefixBytes)
107+
{
108+
return createModuleAAD(aadPrefixBytes, ModuleType.Footer, -1, -1, -1);
109+
}
110+
111+
// Update last two bytes with new page ordinal (instead of creating new page AAD from scratch)
112+
public static void quickUpdatePageAAD(byte[] pageAAD, int newPageOrdinal)
113+
{
114+
requireNonNull(pageAAD, "pageAAD cannot be null");
115+
if (newPageOrdinal < 0) {
116+
throw new IllegalArgumentException("Wrong page ordinal: " + newPageOrdinal);
117+
}
118+
short shortPageOrdinal = (short) newPageOrdinal;
119+
if (shortPageOrdinal != newPageOrdinal) {
120+
throw new ParquetCryptoException("Encrypted parquet files can't have more than %s pages per chunk: %s", Short.MAX_VALUE, newPageOrdinal);
121+
}
122+
123+
byte[] pageOrdinalBytes = shortToBytesLE(shortPageOrdinal);
124+
System.arraycopy(pageOrdinalBytes, 0, pageAAD, pageAAD.length - 2, 2);
125+
}
126+
127+
static byte[] concatByteArrays(byte[]... arrays)
128+
{
129+
int totalLength = 0;
130+
for (byte[] array : arrays) {
131+
totalLength += array.length;
132+
}
133+
134+
byte[] output = new byte[totalLength];
135+
int offset = 0;
136+
for (byte[] array : arrays) {
137+
System.arraycopy(array, 0, output, offset, array.length);
138+
offset += array.length;
139+
}
140+
141+
return output;
142+
}
143+
144+
private static byte[] shortToBytesLE(short input)
145+
{
146+
byte[] output = new byte[2];
147+
output[1] = (byte) (0xff & (input >> 8));
148+
output[0] = (byte) (0xff & input);
149+
150+
return output;
151+
}
152+
}

0 commit comments

Comments
 (0)