Skip to content

Add Parquet decryption support for Hive tables #24517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lib/trino-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand Down Expand Up @@ -192,6 +198,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ public abstract sealed class DataPage
{
protected final int valueCount;
private final OptionalLong firstRowIndex;
private final int pageIndex;

public DataPage(int uncompressedSize, int valueCount, OptionalLong firstRowIndex)
public DataPage(int uncompressedSize, int valueCount, OptionalLong firstRowIndex, int pageIndex)
{
super(uncompressedSize);
this.valueCount = valueCount;
this.firstRowIndex = firstRowIndex;
this.pageIndex = pageIndex;
}

/**
Expand All @@ -41,4 +43,9 @@ public int getValueCount()
{
return valueCount;
}

public int getPageIndex()
{
return pageIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ public DataPageV1(
OptionalLong firstRowIndex,
ParquetEncoding repetitionLevelEncoding,
ParquetEncoding definitionLevelEncoding,
ParquetEncoding valuesEncoding)
ParquetEncoding valuesEncoding,
int pageIndex)
{
super(uncompressedSize, valueCount, firstRowIndex);
super(uncompressedSize, valueCount, firstRowIndex, pageIndex);
this.slice = requireNonNull(slice, "slice is null");
this.repetitionLevelEncoding = repetitionLevelEncoding;
this.definitionLevelEncoding = definitionLevelEncoding;
this.valuesEncoding = valuesEncoding;
}

@Override
public Slice getSlice()
{
return slice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public DataPageV2(
int uncompressedSize,
OptionalLong firstRowIndex,
Statistics<?> statistics,
boolean isCompressed)
boolean isCompressed,
int pageIndex)
{
super(uncompressedSize, valueCount, firstRowIndex);
super(uncompressedSize, valueCount, firstRowIndex, pageIndex);
this.rowCount = rowCount;
this.nullCount = nullCount;
this.repetitionLevels = requireNonNull(repetitionLevels, "repetitionLevels slice is null");
Expand Down Expand Up @@ -82,6 +83,7 @@ public ParquetEncoding getDataEncoding()
return dataEncoding;
}

@Override
public Slice getSlice()
{
return slice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public DictionaryPage(Slice slice, int uncompressedSize, int dictionarySize, Par
encoding);
}

@Override
public Slice getSlice()
{
return slice;
Expand Down
4 changes: 4 additions & 0 deletions lib/trino-parquet/src/main/java/io/trino/parquet/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.parquet;

import io.airlift.slice.Slice;

public abstract class Page
{
protected final int uncompressedSize;
Expand All @@ -26,4 +28,6 @@ public int getUncompressedSize()
{
return uncompressedSize;
}

public abstract Slice getSlice();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.parquet;

import com.google.errorprone.annotations.FormatMethod;
import io.trino.parquet.crypto.ParquetCryptoException;

public final class ParquetValidationUtils
{
Expand All @@ -27,4 +28,12 @@ public static void validateParquet(boolean condition, ParquetDataSourceId dataSo
throw new ParquetCorruptionException(dataSourceId, formatString, args);
}
}

@FormatMethod
public static void validateParquetCrypto(boolean condition, ParquetDataSourceId dataSourceId, String formatString, Object... args)
{
if (!condition) {
throw new ParquetCryptoException(dataSourceId, formatString, args);
}
}
}
152 changes: 152 additions & 0 deletions lib/trino-parquet/src/main/java/io/trino/parquet/crypto/AesCipher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.crypto;

import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;

import java.security.SecureRandom;

import static java.util.Objects.requireNonNull;

public class AesCipher
{
public static final int NONCE_LENGTH = 12;
public static final int GCM_TAG_LENGTH = 16;
protected static final int CTR_IV_LENGTH = 16;
protected static final int GCM_TAG_LENGTH_BITS = 8 * GCM_TAG_LENGTH;
protected static final int CHUNK_LENGTH = 4 * 1024;
protected static final int SIZE_LENGTH = 4;
// NIST SP 800-38D section 8.3 specifies limit on AES GCM encryption operations with same key and random IV/nonce
protected static final long GCM_RANDOM_IV_SAME_KEY_MAX_OPS = 1L << 32;
// NIST SP 800-38A doesn't specify limit on AES CTR operations.
// However, Parquet uses a random IV (with 12-byte random nonce). To avoid repetition due to "birthday problem",
// setting a conservative limit equal to GCM's value for random IVs
protected static final long CTR_RANDOM_IV_SAME_KEY_MAX_OPS = GCM_RANDOM_IV_SAME_KEY_MAX_OPS;
static final int AAD_FILE_UNIQUE_LENGTH = 8;
protected final SecureRandom randomGenerator;
protected final byte[] localNonce;
protected SecretKeySpec aesKey;
protected Cipher cipher;

AesCipher(byte[] keyBytes)
{
requireNonNull(keyBytes, "key bytes cannot be null");
boolean allZeroKey = true;
for (byte kb : keyBytes) {
if (kb != 0) {
allZeroKey = false;
break;
}
}

if (allZeroKey) {
throw new IllegalArgumentException("All key bytes are zero");
}

aesKey = new SecretKeySpec(keyBytes, "AES");
randomGenerator = new SecureRandom();
localNonce = new byte[NONCE_LENGTH];
}

public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, int rowGroupOrdinal, int columnOrdinal, int pageOrdinal)
{
byte[] typeOrdinalBytes = new byte[1];
typeOrdinalBytes[0] = moduleType.getValue();

if (ModuleType.Footer == moduleType) {
return concatByteArrays(fileAAD, typeOrdinalBytes);
}

if (rowGroupOrdinal < 0) {
throw new IllegalArgumentException("Wrong row group ordinal: " + rowGroupOrdinal);
}
short shortRGOrdinal = (short) rowGroupOrdinal;
if (shortRGOrdinal != rowGroupOrdinal) {
throw new ParquetCryptoException("Encrypted parquet files can't have more than %s row groups: %s", Short.MAX_VALUE, rowGroupOrdinal);
}
byte[] rowGroupOrdinalBytes = shortToBytesLE(shortRGOrdinal);

if (columnOrdinal < 0) {
throw new IllegalArgumentException("Wrong column ordinal: " + columnOrdinal);
}
short shortColumOrdinal = (short) columnOrdinal;
if (shortColumOrdinal != columnOrdinal) {
throw new ParquetCryptoException("Encrypted parquet files can't have more than %s columns: %s", Short.MAX_VALUE, columnOrdinal);
}
byte[] columnOrdinalBytes = shortToBytesLE(shortColumOrdinal);

if (ModuleType.DataPage != moduleType && ModuleType.DataPageHeader != moduleType) {
return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes);
}

if (pageOrdinal < 0) {
throw new IllegalArgumentException("Wrong page ordinal: " + pageOrdinal);
}
short shortPageOrdinal = (short) pageOrdinal;
if (shortPageOrdinal != pageOrdinal) {
throw new ParquetCryptoException("Encrypted parquet files can't have more than %s pages per chunk: %s", Short.MAX_VALUE, pageOrdinal);
}
byte[] pageOrdinalBytes = shortToBytesLE(shortPageOrdinal);

return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes, pageOrdinalBytes);
}

public static byte[] createFooterAAD(byte[] aadPrefixBytes)
{
return createModuleAAD(aadPrefixBytes, ModuleType.Footer, -1, -1, -1);
}

// Update last two bytes with new page ordinal (instead of creating new page AAD from scratch)
public static void quickUpdatePageAAD(byte[] pageAAD, int newPageOrdinal)
{
requireNonNull(pageAAD, "pageAAD cannot be null");
if (newPageOrdinal < 0) {
throw new IllegalArgumentException("Wrong page ordinal: " + newPageOrdinal);
}
short shortPageOrdinal = (short) newPageOrdinal;
if (shortPageOrdinal != newPageOrdinal) {
throw new ParquetCryptoException("Encrypted parquet files can't have more than %s pages per chunk: %s", Short.MAX_VALUE, newPageOrdinal);
}

byte[] pageOrdinalBytes = shortToBytesLE(shortPageOrdinal);
System.arraycopy(pageOrdinalBytes, 0, pageAAD, pageAAD.length - 2, 2);
}

static byte[] concatByteArrays(byte[]... arrays)
{
int totalLength = 0;
for (byte[] array : arrays) {
totalLength += array.length;
}

byte[] output = new byte[totalLength];
int offset = 0;
for (byte[] array : arrays) {
System.arraycopy(array, 0, output, offset, array.length);
offset += array.length;
}

return output;
}

private static byte[] shortToBytesLE(short input)
{
byte[] output = new byte[2];
output[1] = (byte) (0xff & (input >> 8));
output[0] = (byte) (0xff & input);

return output;
}
}
Loading