Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public enum HiveErrorCode
HIVE_FUNCTION_INITIALIZATION_ERROR(49, EXTERNAL),
HIVE_METASTORE_INITIALIZE_SSL_ERROR(50, EXTERNAL),
UNKNOWN_TABLE_TYPE(51, EXTERNAL),
HIVE_PARTITION_NOT_FOUND(52, USER_ERROR),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.spi.security.RoleGrant;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -58,7 +59,12 @@ default Optional<Table> getTable(MetastoreContext metastoreContext, HiveTableHan

void updateTableStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, Function<PartitionStatistics, PartitionStatistics> update);

void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, String partitionName, Function<PartitionStatistics, PartitionStatistics> update);
default void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, String partitionName, Function<PartitionStatistics, PartitionStatistics> update)
{
updatePartitionStatistics(metastoreContext, databaseName, tableName, ImmutableMap.of(partitionName, update));
}

void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates);

Optional<List<String>> getAllTables(MetastoreContext metastoreContext, String databaseName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,15 +477,17 @@ public void updateTableStatistics(MetastoreContext metastoreContext, String data
}

@Override
public void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, String partitionName, Function<PartitionStatistics, PartitionStatistics> update)
public void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates)
{
try {
delegate.updatePartitionStatistics(metastoreContext, databaseName, tableName, partitionName, update);
delegate.updatePartitionStatistics(metastoreContext, databaseName, tableName, updates);
}
finally {
partitionStatisticsCache.asMap().keySet().stream()
.filter(partitionFilterKey -> partitionFilterKey.getKey().equals(hivePartitionName(databaseName, tableName, partitionName)))
.forEach(partitionStatisticsCache::invalidate);
updates.forEach((partitionName, update) -> {
HivePartitionName hivePartitionName = hivePartitionName(databaseName, tableName, partitionName);
partitionStatisticsCache.invalidate(getCachingKey(metastoreContext, hivePartitionName));
partitionCache.invalidate(getCachingKey(metastoreContext, hivePartitionName));
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ public static ProtectMode getProtectMode(Table table)
return getProtectMode(table.getParameters());
}

public static String makePartitionName(Table table, Partition partition)
{
return makePartName(table.getPartitionColumns(), partition.getValues());
}

public static String makePartName(List<Column> partitionColumns, List<String> values)
{
checkArgument(partitionColumns.size() == values.size(), "Partition value count does not match the partition column count");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ public void updateTableStatistics(MetastoreContext metastoreContext, String data
}

@Override
public void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, String partitionName, Function<PartitionStatistics, PartitionStatistics> update)
public void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates)
{
verifyRecordingMode();
delegate.updatePartitionStatistics(metastoreContext, databaseName, tableName, partitionName, update);
delegate.updatePartitionStatistics(metastoreContext, databaseName, tableName, updates);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.presto.common.ErrorType.USER_ERROR;
Expand Down Expand Up @@ -107,6 +108,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
Expand Down Expand Up @@ -390,14 +392,16 @@ public synchronized void setTableStatistics(MetastoreContext metastoreContext, T
// TODO: Allow updating statistics for 2 tables in the same transaction
public synchronized void setPartitionStatistics(MetastoreContext metastoreContext, Table table, Map<List<String>, PartitionStatistics> partitionStatisticsMap)
{
Map<String, Function<PartitionStatistics, PartitionStatistics>> updates = partitionStatisticsMap.entrySet().stream().collect(
toImmutableMap(
entry -> getPartitionName(table, entry.getKey()),
entry -> oldPartitionStats -> updatePartitionStatistics(oldPartitionStats, entry.getValue())));
setExclusive((delegate, hdfsEnvironment) -> {
partitionStatisticsMap.forEach((partitionValues, newPartitionStats) ->
delegate.updatePartitionStatistics(
metastoreContext,
table.getDatabaseName(),
table.getTableName(),
getPartitionName(table, partitionValues),
oldPartitionStats -> updatePartitionStatistics(oldPartitionStats, newPartitionStats)));
delegate.updatePartitionStatistics(
metastoreContext,
table.getDatabaseName(),
table.getTableName(),
updates);
return EMPTY_HIVE_COMMIT_HANDLE;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,25 +359,27 @@ public synchronized void updateTableStatistics(MetastoreContext metastoreContext
}

@Override
public synchronized void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, String partitionName, Function<PartitionStatistics, PartitionStatistics> update)
public synchronized void updatePartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates)
{
PartitionStatistics originalStatistics = getPartitionStatistics(metastoreContext, databaseName, tableName, ImmutableSet.of(partitionName)).get(partitionName);
if (originalStatistics == null) {
throw new PrestoException(HIVE_PARTITION_DROPPED_DURING_QUERY, "Statistics result does not contain entry for partition: " + partitionName);
}
PartitionStatistics updatedStatistics = update.apply(originalStatistics);
updates.forEach((partitionName, update) -> {
PartitionStatistics originalStatistics = getPartitionStatistics(metastoreContext, databaseName, tableName, ImmutableSet.of(partitionName)).get(partitionName);
if (originalStatistics == null) {
throw new PrestoException(HIVE_PARTITION_DROPPED_DURING_QUERY, "Statistics result does not contain entry for partition: " + partitionName);
}
PartitionStatistics updatedStatistics = update.apply(originalStatistics);

Table table = getRequiredTable(metastoreContext, databaseName, tableName);
List<String> partitionValues = extractPartitionValues(partitionName);
Path partitionDirectory = getPartitionMetadataDirectory(table, partitionValues);
PartitionMetadata partitionMetadata = readSchemaFile("partition", partitionDirectory, partitionCodec)
.orElseThrow(() -> new PartitionNotFoundException(new SchemaTableName(databaseName, tableName), partitionValues));
Table table = getRequiredTable(metastoreContext, databaseName, tableName);
List<String> partitionValues = extractPartitionValues(partitionName);
Path partitionDirectory = getPartitionMetadataDirectory(table, partitionValues);
PartitionMetadata partitionMetadata = readSchemaFile("partition", partitionDirectory, partitionCodec)
.orElseThrow(() -> new PartitionNotFoundException(new SchemaTableName(databaseName, tableName), partitionValues));

PartitionMetadata updatedMetadata = partitionMetadata
.withParameters(updateStatisticsParameters(partitionMetadata.getParameters(), updatedStatistics.getBasicStatistics()))
.withColumnStatistics(updatedStatistics.getColumnStatistics());
PartitionMetadata updatedMetadata = partitionMetadata
.withParameters(updateStatisticsParameters(partitionMetadata.getParameters(), updatedStatistics.getBasicStatistics()))
.withColumnStatistics(updatedStatistics.getColumnStatistics());

writeSchemaFile("partition", partitionDirectory, partitionCodec, updatedMetadata, true);
writeSchemaFile("partition", partitionDirectory, partitionCodec, updatedMetadata, true);
});
}

@Override
Expand Down
Loading
Loading