Skip to content

Delta Core + Flink SQL + Partition Pushdown using the SupportsFilterPushdown hack #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: delta_core_with_flink_feature_branch
Choose a base branch
from
Open
  •  
  •  
  •  
15 changes: 15 additions & 0 deletions core/src/main/java/io/delta/standalone/expressions/And.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.delta.standalone.expressions;

import java.util.Collection;

import io.delta.core.internal.expressions.ExpressionErrors;
import io.delta.standalone.types.BooleanType;

Expand All @@ -10,6 +12,19 @@
*/
public final class And extends BinaryOperator implements Predicate {

public static And apply(Collection<Expression> conjunctions) {
if (conjunctions.size() < 2) {
throw new IllegalArgumentException("And.apply must be called with at least 2 elements");
}

return (And) conjunctions
.stream()
// we start off with And(true, true)
// then we get the 1st expression: And(And(true, true), expr1)
// then we get the 2nd expression: And(And(true, true), expr1), expr2) etc.
.reduce(new And(Literal.True, Literal.True), And::new);
}

public And(Expression left, Expression right) {
super(left, right, "&&");
if (!(left.dataType() instanceof BooleanType) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,12 @@ public static Literal ofNull(DataType dataType) {
throw new IllegalArgumentException(
dataType.getTypeName() + " is an invalid data type for Literal.");
}
return new Literal(null, dataType); }
return new Literal(null, dataType);
}

public static Literal castString(String value, DataType dataType) {
if (dataType instanceof LongType) return Literal.of(Long.parseLong(value));

throw new RuntimeException("Literal.castString only supports LongType right now");
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/io/delta/standalone/expressions/Or.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.delta.standalone.expressions;

import java.util.Collection;

import io.delta.core.internal.expressions.ExpressionErrors;
import io.delta.standalone.types.BooleanType;

Expand All @@ -10,6 +12,19 @@
*/
public final class Or extends BinaryOperator implements Predicate {

public static Or apply(Collection<Expression> disjunctions) {
if (disjunctions.size() < 2) {
throw new IllegalArgumentException("Or.apply must be called with at least 2 elements");
}

return (Or) disjunctions
.stream()
// we start off with Or(false, false)
// then we get the 1st expression: Or(Or(false, false), expr1)
// then we get the 2nd expression: Or(Or(false, false), expr1), expr2) etc.
.reduce(new Or(Literal.False, Literal.False), Or::new);
}

public Or(Expression left, Expression right) {
super(left, right, "||");
if (!(left.dataType() instanceof BooleanType) ||
Expand Down
43 changes: 33 additions & 10 deletions flink/src/main/java/io/delta/flink/internal/table/CatalogProxy.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.delta.flink.internal.table;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down Expand Up @@ -85,21 +87,29 @@ public void alterTable(
}
}


@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {

DeltaCatalogBaseTable catalogTable = getCatalogTable(tablePath);
if (catalogTable.isDeltaTable()) {
// Delta standalone Metadata does not provide information about partition value.
// This information is needed to build CatalogPartitionSpec
throw new CatalogException(
"Delta table connector does not support partition listing.");

// WE ARE TRYING TO DO FILTER PUSHDOWN WITHOUT LISTING PARTITIONS BY MAKING THE PLANNER
// THINK THAT PARTITIONS COLUMNS ARE ACTUALLY JUST DATA COLUMNS
return Collections.emptyList();
// throw new CatalogException(
// "Delta table connector does not support partition listing.");
// System.out.println("Scott > listPartitions(ObjectPath tablePath)");
// return this.deltaCatalog.listPartitions(objectPathToFileSystemPath(tablePath));
} else {
return this.decoratedCatalog.listPartitions(tablePath);
}
}

// WE ARE TRYING TO DO FILTER PUSHDOWN WITHOUT LISTING PARTITIONS BY MAKING THE PLANNER
// THINK THAT PARTITIONS COLUMNS ARE ACTUALLY JUST DATA COLUMNS
@Override
public List<CatalogPartitionSpec> listPartitions(
ObjectPath tablePath,
Expand All @@ -113,28 +123,37 @@ public List<CatalogPartitionSpec> listPartitions(
// This information is needed to build CatalogPartitionSpec
throw new CatalogException(
"Delta table connector does not support partition listing.");
// System.out.println("Scott > listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)");
// return this.deltaCatalog.listPartitions(objectPathToFileSystemPath(tablePath), partitionSpec);
} else {
return this.decoratedCatalog.listPartitions(tablePath, partitionSpec);
}
}

// WE ARE TRYING TO DO FILTER PUSHDOWN WITHOUT LISTING PARTITIONS BY MAKING THE PLANNER
// THINK THAT PARTITIONS COLUMNS ARE ACTUALLY JUST DATA COLUMNS
@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(
ObjectPath tablePath,
List<Expression> filters) throws TableNotExistException, TableNotPartitionedException,
CatalogException {

System.out.println("Scott > CatalogProxy > listPartitionsByFilter ::" + filters.stream().map(Expression::asSummaryString).collect(Collectors.joining(" AND ")));
DeltaCatalogBaseTable catalogTable = getCatalogTable(tablePath);
if (catalogTable.isDeltaTable()) {
// Delta standalone Metadata does not provide information about partition value.
// This information is needed to build CatalogPartitionSpec
throw new CatalogException(
"Delta table connector does not support partition listing by filter.");
throw new UnsupportedOperationException("see what happens");
// throw new CatalogException(
// "Delta table connector does not support partition listing by filter.");
// System.out.println("Scott > listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)");
// return this.deltaCatalog.listPartitionsByFilter(objectPathToFileSystemPath(tablePath), filters);
} else {
return this.decoratedCatalog.listPartitionsByFilter(tablePath, filters);
}
}

// WE ARE TRYING TO DO FILTER PUSHDOWN WITHOUT LISTING PARTITIONS BY MAKING THE PLANNER
// THINK THAT PARTITIONS COLUMNS ARE ACTUALLY JUST DATA COLUMNS
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
Expand Down Expand Up @@ -260,8 +279,7 @@ public CatalogTableStatistics getPartitionStatistics(
throws PartitionNotExistException, CatalogException {

if (getCatalogTable(tablePath).isDeltaTable()) {
throw new CatalogException(
"Delta table connector does not support partition statistics.");
return this.deltaCatalog.getPartitionStatistics(objectPathToFileSystemPath(tablePath), tablePath, partitionSpec);
} else {
return this.decoratedCatalog.getPartitionStatistics(tablePath, partitionSpec);
}
Expand All @@ -274,8 +292,7 @@ public CatalogColumnStatistics getPartitionColumnStatistics(
throws PartitionNotExistException, CatalogException {

if (getCatalogTable(tablePath).isDeltaTable()) {
throw new CatalogException(
"Delta table connector does not support partition column statistics.");
return this.deltaCatalog.getPartitionColumnStatistics(objectPathToFileSystemPath(tablePath), tablePath, partitionSpec);
} else {
return this.decoratedCatalog.getPartitionColumnStatistics(tablePath, partitionSpec);
}
Expand Down Expand Up @@ -376,4 +393,10 @@ private DeltaCatalogBaseTable getCatalogTableUnchecked(ObjectPath tablePath)
CatalogBaseTable table = this.decoratedCatalog.getTable(tablePath);
return new DeltaCatalogBaseTable(tablePath, table);
}

private String objectPathToFileSystemPath(ObjectPath tableObjectPath) {
DeltaCatalogBaseTable catalogTable = getCatalogTable(tableObjectPath);
CatalogBaseTable metastoreTable = catalogTable.getCatalogTable();
return metastoreTable.getOptions().get(DeltaTableConnectorOptions.TABLE_PATH.key());
}
}
143 changes: 132 additions & 11 deletions flink/src/main/java/io/delta/flink/internal/table/DeltaCatalog.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package io.delta.flink.internal.table;

import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;

import io.delta.flink.internal.table.DeltaCatalogTableHelper.DeltaMetastoreTable;
import io.delta.standalone.Snapshot;
import io.delta.standalone.actions.AddFile;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.*;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -335,4 +335,125 @@ public void alterTable(DeltaCatalogBaseTable newCatalogTable) {
DeltaCatalogTableHelper
.commitToDeltaLog(deltaLog, updatedMetadata, Operation.Name.SET_TABLE_PROPERTIES);
}

List<CatalogPartitionSpec> listPartitions(String tablePath) throws TableNotExistException,
TableNotPartitionedException,
CatalogException {
System.out.println("Scott > listPartitions(String) > tablePath :: " + tablePath);
final DeltaLog log = DeltaLog.forTable(hadoopConf, tablePath);
final Snapshot snapshot = log.update();

if (snapshot.getMetadata().getPartitionColumns().isEmpty()) {
throw new TableNotPartitionedException(null, null); // TODO: we need the objectPath
}


final Set<CatalogPartitionSpec> output = new HashSet<>();
snapshot.scan().getFiles().forEachRemaining(addFile -> {
final String vals = addFile
.getPartitionValues()
.entrySet()
.stream()
.map(entry -> entry.getKey() + "->" + entry.getValue())
.collect(Collectors.joining(", "));
System.out.println("Scott > DeltaCatalog > listPartitions :: " + vals);

CatalogPartitionSpec spec = new CatalogPartitionSpec(addFile.getPartitionValues());

if (output.contains(spec)) {
System.out.println("Scott > DeltaCatalog > listPartitions :: DUPLICATE " + spec);
}

output.add(spec);
});

return new ArrayList<>(output);
}

public List<CatalogPartitionSpec> listPartitions(
String tablePath,
CatalogPartitionSpec partitionSpec)
throws CatalogException, TableNotPartitionedException, TableNotExistException, PartitionSpecInvalidException {

System.out.println("Scott > listPartitions(String, CatalogPartitionSpec) > tablePath :: " + tablePath);

return null;
}

public List<CatalogPartitionSpec> listPartitionsByFilter(
String tablePath,
List<Expression> filters) throws TableNotExistException, TableNotPartitionedException,
CatalogException {
System.out.println("Scott > listPartitionsByFilter(String, CatalogPartitionSpec) > tablePath :: " + tablePath);
return null;
}

/**
* Get the statistics of a partition.
* Params:
* tablePath – path of the table
* partitionSpec – partition spec of the partition
* Returns:
* statistics of the given partition
* Throws:
* PartitionNotExistException – if the partition does not exist
* CatalogException – in case of any runtime exception
*
* TODO: use a cache for the tablePath!
*/
public CatalogTableStatistics getPartitionStatistics(
String tablePath,
ObjectPath tableObjectPath,
CatalogPartitionSpec partitionSpec) throws PartitionNotExistException {
System.out.println("Scott > getPartitionStatistics :: " + tablePath + ", partitionSpec " + partitionSpec);
final DeltaLog log = DeltaLog.forTable(hadoopConf, tablePath);
final Snapshot snapshot = log.update();
final List<AddFile> filesInPartition = new ArrayList<>();

snapshot.scan().getFiles().forEachRemaining(addFile -> {
// TODO: HORRIBLY inefficient. CACHE instead. also, generate an expression filter??
if (addFile.getPartitionValues().equals(partitionSpec.getPartitionSpec())) {
filesInPartition.add(addFile);
}
});

if (filesInPartition.isEmpty()) {
throw new PartitionNotExistException("delta" /* catalog name */, tableObjectPath, partitionSpec);
}

long totalRowCount = 0;
final int fileCount = filesInPartition.size();
long totalSizeBytes = 0;
long totalRawSizeBytes = 0;

for (AddFile addFile : filesInPartition) {
totalRowCount += 10; // TODO parse stats
totalSizeBytes += addFile.getSize();
totalRawSizeBytes += addFile.getSize();
}


return new CatalogTableStatistics(totalRowCount, fileCount, totalSizeBytes, totalRawSizeBytes);
}

public CatalogColumnStatistics getPartitionColumnStatistics(
String tablePath,
ObjectPath tableObjectPath,
CatalogPartitionSpec partitionSpec) throws PartitionNotExistException {
System.out.println("Scott > getPartitionColumnStatistics :: " + tablePath + ", partitionSpec " + partitionSpec);
// TODO ugh implement this

// TODO: parse stats!
final Long min = 0L;
final Long max = 100L;
final Long numDistinctValues = 10L;
final Long nullCount = 0L;

final CatalogColumnStatisticsDataBase stat = new CatalogColumnStatisticsDataLong(min, max, numDistinctValues, nullCount);
final Map<String, CatalogColumnStatisticsDataBase> m = new HashMap<>();
m.put("id", stat);
return new CatalogColumnStatistics(m);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class DeltaCatalogBaseTable {

private final boolean isDeltaTable;

// TODO would it be helpful to put the filesystem path in here?

public DeltaCatalogBaseTable(ObjectPath tableCatalogPath, CatalogBaseTable catalogTable) {
checkNotNull(tableCatalogPath, "Object path cannot be null for DeltaCatalogBaseTable.");
checkNotNull(catalogTable, "Catalog table cannot be null for DeltaCatalogBaseTable.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
System.out.println("Scott > DeltaDynamicTableFactory > createDynamicTableSource :: context " + context.toString());

if (!isFromCatalog) {
throw notFromDeltaCatalogException();
Expand Down Expand Up @@ -167,6 +168,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
.getLogicalType()
).getFieldNames();

System.out.println("Scott > DeltaDynamicTableFactory > creating DeltaDynamicTableSource");

return new DeltaDynamicTableSource(
hadoopConf,
options, // PR FlinkSql_PR_8 change to queryOptions
Expand Down
Loading