diff --git a/.travis.yml b/.travis.yml
index 71ba451a0ece3..b68bd71770e68 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,27 +29,43 @@ dist: trusty
cache:
directories:
- $HOME/.m2/repository
+ - $HOME/.thrift
services:
- docker
+before_install:
+ - |
+ if [[ ! -e $HOME/.thrift/bin/thrift ]]; then
+ sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libboost-filesystem-dev libboost-thread-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev
+ wget https://www.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz
+ tar xfz thrift-0.9.3.tar.gz
+ cd thrift-0.9.3 && ./configure --without-cpp --without-c_glib --without-python --without-ruby --without-php --without-erlang --without-go --without-nodejs -q --prefix=$HOME/.thrift
+ sudo make install > thrift_make_install.log
+ cd ..
+ fi
+ - |
+ if [[ ! -e /usr/local/bin/thrift ]]; then
+ sudo ln -s $HOME/.thrift/bin/thrift /usr/local/bin/thrift
+ fi
+
install:
- ./mvnw -v
- |
if [[ -v TEST_SPECIFIC_MODULES ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl $TEST_SPECIFIC_MODULES -am
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl $TEST_SPECIFIC_MODULES -am
fi
- |
if [[ -v TEST_OTHER_MODULES ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl '!presto-docs,!presto-server,!presto-server-rpm'
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl '!presto-docs,!presto-server,!presto-server-rpm'
fi
- |
if [[ -v PRODUCT_TESTS_BASIC_ENVIRONMENT || -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT || -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT_2 ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl '!presto-docs,!presto-server-rpm'
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl '!presto-docs,!presto-server-rpm'
fi
- |
if [[ -v HIVE_TESTS ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl presto-hive-hadoop2 -am
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl presto-hive-hadoop2 -am
fi
before_script:
@@ -71,15 +87,15 @@ before_script:
script:
- |
if [[ -v MAVEN_CHECKS ]]; then
- ./mvnw install -DskipTests -B -T C1 -P ci
+ ./mvnw install -DskipTests -B -T C1 -P 'ci,!twitter-modules'
fi
- |
if [[ -v TEST_SPECIFIC_MODULES ]]; then
- ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -B -pl $TEST_SPECIFIC_MODULES $TEST_FLAGS
+ ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -P !twitter-modules -B -pl $TEST_SPECIFIC_MODULES $TEST_FLAGS
fi
- |
if [[ -v TEST_OTHER_MODULES ]]; then
- ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -B -pl $TEST_OTHER_MODULES
+ ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -P !twitter-modules -B -pl $TEST_OTHER_MODULES
fi
- |
if [[ -v PRODUCT_TESTS_BASIC_ENVIRONMENT ]]; then
@@ -122,7 +138,7 @@ script:
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT_2 ]]; then
presto-product-tests/bin/run_on_docker.sh \
- multinode-tls -g smoke,cli,group-by,join,tls
+ multinode-tls -g smoke,group-by,join,tls
fi
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT_2 ]]; then
@@ -177,8 +193,11 @@ before_cache:
- rm -rf $HOME/.m2/repository/com/facebook
notifications:
+ hipchat:
+ rooms:
+ secure: peNh1KxwlxIpFyb60S8AMvaJThgh1LsjE+Whf1rYkJalVd2wUrqBIoyDKVSueyHD01hQ06gT7rBV6Pu/QcBMR1a9BbMCjERfxLZFUAheuC2Rsb+p1c4dyvBcFUGacgW7XWKCaVYGDGxuUvb0I3Z8cR6KxhK2xi88tHiqBGVGV2yI6zzOTpWVknMfFBtn+ONU1Ob2P6trclXaDyFd4MxubULri6CQdl35eQAq/VnmR3SZOgyVu3V30MGKwI3zhSli+3VqmW0JmaDGoHN6gznM1+VqABLgmIq0P+n+r5gdZWRCorq10NZCFMhVQ8U6rQHcL7sAniYJJsC/yRt6+pjyzIF4N+LSzZ7T+FLxQqT7k/1ukNgrujLDfTpn76Mo9eYTZmfAdzbm1QKJDACwr8Slqhq1jGzcrFMHunvXhVqjOs24R+JAHblY0O9PXvv7aR29GOQWDCvD7nV5QBUr8Xz5q7ozbLqHTI+yH02Jj4EaZ+azWYdRmnr9wDBxWMYBEgOdj4pII9b298XEDB72TxA3KpLTpdLxBTR+gIk/LjJqb/wb84xUv8gPXkaXccltGd5YI90c84cX8isbzNkAylzyfF2Eyueh0XbnMHfpFqBS7qaVM0/D+UxZkU0WNJ0x7G9XJvkiq49bZz2q1KLE4XuvVnTZSSjVSUAS8RtHfwUV33c=
slack:
- secure: V5eyoGShxFoCcYJcp858vf/T6gC9KeMxL0C1EElcpZRcKBrIVZzvhek3HLHxZOxlghqnvNVsyDtU3u5orkEaAXeXj5c2dN+4XBsAB9oeN5MtQ0Z3VLAhZDqKIW1LzcXrq4DpzM0PkGhjfjum/P94/qFYk0UckPtB6a341AuYRo8=
+ secure: E7XVlbdwIdKxnr6Tk1rmCefufs1w8h4nCWz79Uh6wMma8gC7x5ChKFqwvLRJ0WUpmPS+Ng1xeTv+wmb8TMDv2X8snmht9420/TFRy9wi1aLWNJXQUveNBzn83sCS40jFi6gd9xqKawd68R84UVH3PeNhksDtDnKAblx71miwbKmLwHc1KFoLMEnaaWEg5NgFl8/UadYDvsLD44v6YDza8eYrLp3aGK8v9ewBDySHE16IHAfpteTRaU0kG/H1kvVvFdH/h/sSPfimehd51b4i3mm/nRrjJ/VSLc7p9w5FkHUECtA0N6zcytRxN6MrbhrxJ8XG3vte3KSRSFCqfgOSRM2NWcca4CtBP2V0SwrAYMo5jim6fr921lfcbUTWTSnvMYLC17QrAxoclVrgK05GjGoLgSH42UPGf3QNkqXzyueNzaLJ+KSlgwFblIQKp6WGZYSRorL0F7s50pIoqMVoebcrnB0ObK/CcE2ywS/HeTgoSkWSDSmKBsO+cmtv1yAamy9DlmgRGZlxIxdBELXtHRkQ2B6Z2QdiQU4MHiFBc/IESJbnCait4odn+oJUjehZg+b9vjCoWwVw3zNMIJhokyxO8SiyKJmbO0z1g2L/BykWGI1DQu8HkeQzO+CmNUV3AOrxDG3amL/tkB/06fyQtnYMDhUhvX64uWSaE36sYL4=
before_deploy:
- mkdir /tmp/artifacts
diff --git a/README.md b/README.md
index 800bfddc21c0f..8b572bf54111f 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# Presto [](https://travis-ci.org/prestodb/presto)
+# Presto [](https://travis-ci.org/twitter-forks/presto)
Presto is a distributed SQL query engine for big data.
diff --git a/pom.xml b/pom.xml
index 987a85225568c..ffbaac06cfafb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
com.facebook.presto
presto-root
- 0.208
+ 0.208-tw-0.53
pom
presto-root
@@ -28,10 +28,10 @@
- scm:git:git://github.com/facebook/presto.git
- https://github.com/facebook/presto
- 0.208
-
+ scm:git:git://github.com/twitter-forks/presto.git
+ https://github.com/twitter-forks/presto
+ 0.208-tw-0.53
+
${project.basedir}
@@ -117,6 +117,7 @@
presto-thrift-testing-server
presto-thrift-connector
presto-matching
+ presto-twitter-functions
presto-memory-context
presto-proxy
@@ -801,6 +802,12 @@
3.6.1
+
+ org.apache.commons
+ commons-pool2
+ 2.4.2
+
+
io.airlift.discovery
discovery-server
@@ -889,6 +896,54 @@
1.1.2.6
+
+ org.apache.curator
+ curator-recipes
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-framework
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-client
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-test
+ 2.12.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
org.apache.zookeeper
zookeeper
@@ -909,6 +964,26 @@
+
+ com.101tec
+ zkclient
+ 0.10
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
org.jgrapht
jgrapht-core
@@ -1224,6 +1299,17 @@
+
+ twitter-modules
+
+ true
+
+
+ presto-kafka07
+ twitter-eventlistener-plugin
+ presto-twitter-server
+
+
tests-with-dependencies
io.airlift
@@ -204,6 +265,18 @@
+
+ org.apache.curator
+ curator-test
+ test
+
+
+
+ com.101tec
+ zkclient
+ test
+
+
com.facebook.presto
presto-main
@@ -252,12 +325,6 @@
test
-
- org.anarres.lzo
- lzo-hadoop
- test
-
-
com.facebook.presto
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
index 895a5e53c1a0c..ef94a1e238ff4 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
@@ -100,6 +100,7 @@ public class HiveClientConfig
private boolean useParquetColumnNames;
private boolean parquetOptimizedReaderEnabled = true;
private boolean parquetPredicatePushdownEnabled = true;
+ private boolean parquetNestedFieldsProjectionPushdownEnabled;
private boolean assumeCanonicalPartitionKeys;
@@ -679,6 +680,18 @@ public HiveClientConfig setParquetPredicatePushdownEnabled(boolean parquetPredic
return this;
}
+ public boolean isParquetNestedFieldsProjectionPushdownEnabled()
+ {
+ return parquetNestedFieldsProjectionPushdownEnabled;
+ }
+
+ @Config("hive.parquet-nested-fields-projection-pushdown.enabled")
+ public HiveClientConfig setParquetNestedFieldsProjectionPushdownEnabled(boolean parquetNestedFieldsProjectionPushdownEnabled)
+ {
+ this.parquetNestedFieldsProjectionPushdownEnabled = parquetNestedFieldsProjectionPushdownEnabled;
+ return this;
+ }
+
@Deprecated
public boolean isParquetOptimizedReaderEnabled()
{
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
index b0bb04cb410a3..a9f7b8aa74c7b 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
@@ -23,6 +23,9 @@
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.twitter.hive.thrift.HiveThriftFieldIdResolverFactory;
+import com.facebook.presto.twitter.hive.thrift.ThriftFieldIdResolverFactory;
+import com.facebook.presto.twitter.hive.thrift.ThriftHiveRecordCursorProvider;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
@@ -72,9 +75,11 @@ public void configure(Binder binder)
binder.bind(NamenodeStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(NamenodeStats.class).as(generatedNameOf(NamenodeStats.class, connectorId));
+ binder.bind(ThriftFieldIdResolverFactory.class).toInstance(new HiveThriftFieldIdResolverFactory());
Multibinder recordCursorProviderBinder = newSetBinder(binder, HiveRecordCursorProvider.class);
recordCursorProviderBinder.addBinding().to(ParquetRecordCursorProvider.class).in(Scopes.SINGLETON);
+ recordCursorProviderBinder.addBinding().to(ThriftHiveRecordCursorProvider.class).in(Scopes.SINGLETON);
recordCursorProviderBinder.addBinding().to(GenericHiveRecordCursorProvider.class).in(Scopes.SINGLETON);
binder.bind(HiveWriterStats.class).in(Scopes.SINGLETON);
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java
index 0b153e7f71814..1d30bfc1d8f85 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java
@@ -19,9 +19,11 @@
import com.facebook.presto.spi.type.TypeSignature;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
@@ -29,7 +31,6 @@
import static com.facebook.presto.hive.HiveType.HIVE_LONG;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
-import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -56,15 +57,22 @@ public enum ColumnType
}
private final String name;
+ private final Optional> fieldSet;
private final HiveType hiveType;
private final TypeSignature typeName;
private final int hiveColumnIndex;
private final ColumnType columnType;
private final Optional comment;
+ public HiveColumnHandle(String name, HiveType hiveType, TypeSignature typeSignature, int hiveColumnIndex, ColumnType columnType, Optional comment)
+ {
+ this(name, Optional.empty(), hiveType, typeSignature, hiveColumnIndex, columnType, comment);
+ }
+
@JsonCreator
public HiveColumnHandle(
@JsonProperty("name") String name,
+ @JsonProperty("fieldSet") Optional> fieldSet,
@JsonProperty("hiveType") HiveType hiveType,
@JsonProperty("typeSignature") TypeSignature typeSignature,
@JsonProperty("hiveColumnIndex") int hiveColumnIndex,
@@ -72,6 +80,7 @@ public HiveColumnHandle(
@JsonProperty("comment") Optional comment)
{
this.name = requireNonNull(name, "name is null");
+ this.fieldSet = requireNonNull(fieldSet, "fieldSet is null");
checkArgument(hiveColumnIndex >= 0 || columnType == PARTITION_KEY || columnType == SYNTHESIZED, "hiveColumnIndex is negative");
this.hiveColumnIndex = hiveColumnIndex;
this.hiveType = requireNonNull(hiveType, "hiveType is null");
@@ -86,6 +95,12 @@ public String getName()
return name;
}
+ @JsonProperty
+ public Optional> getFieldSet()
+ {
+ return fieldSet;
+ }
+
@JsonProperty
public HiveType getHiveType()
{
@@ -134,7 +149,7 @@ public ColumnType getColumnType()
@Override
public int hashCode()
{
- return Objects.hash(name, hiveColumnIndex, hiveType, columnType, comment);
+ return Objects.hash(name, fieldSet, hiveColumnIndex, hiveType, columnType, comment);
}
@Override
@@ -148,6 +163,7 @@ public boolean equals(Object obj)
}
HiveColumnHandle other = (HiveColumnHandle) obj;
return Objects.equals(this.name, other.name) &&
+ Objects.equals(this.fieldSet, other.fieldSet) &&
Objects.equals(this.hiveColumnIndex, other.hiveColumnIndex) &&
Objects.equals(this.hiveType, other.hiveType) &&
Objects.equals(this.columnType, other.columnType) &&
@@ -157,14 +173,7 @@ public boolean equals(Object obj)
@Override
public String toString()
{
- return toStringHelper(this)
- .add("name", name)
- .add("hiveType", hiveType)
- .add("hiveColumnIndex", hiveColumnIndex)
- .add("columnType", columnType)
- .add("comment", comment.orElse(null))
- .omitNullValues()
- .toString();
+ return name + ":" + fieldSet.orElse(ImmutableSet.of()) + ":" + hiveType + ":" + hiveColumnIndex + ":" + columnType;
}
public static HiveColumnHandle updateRowIdHandle()
@@ -202,4 +211,9 @@ public static boolean isBucketColumnHandle(HiveColumnHandle column)
{
return column.getHiveColumnIndex() == BUCKET_COLUMN_INDEX;
}
+
+ public static HiveColumnHandle withFieldSet(HiveColumnHandle column, Optional> fieldSet)
+ {
+ return new HiveColumnHandle(column.name, fieldSet, column.getHiveType(), column.getTypeSignature(), column.getHiveColumnIndex(), column.getColumnType(), column.getComment());
+ }
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java
index d41568db76162..e3223e3942ce6 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java
@@ -113,6 +113,7 @@
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_NAME;
import static com.facebook.presto.hive.HiveColumnHandle.updateRowIdHandle;
+import static com.facebook.presto.hive.HiveColumnHandle.withFieldSet;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
@@ -1398,20 +1399,44 @@ public List getTableLayouts(ConnectorSession session
HiveTableHandle handle = (HiveTableHandle) tableHandle;
HivePartitionResult hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, constraint);
+ HiveTableLayoutHandle layoutHandle = new HiveTableLayoutHandle(
+ handle.getSchemaTableName(),
+ ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()),
+ getPartitionsAsList(hivePartitionResult),
+ hivePartitionResult.getCompactEffectivePredicate(),
+ hivePartitionResult.getEnforcedConstraint(),
+ hivePartitionResult.getBucketHandle(),
+ hivePartitionResult.getBucketFilter());
+
+ if (constraint.fieldSets().isPresent()) {
+ return ImmutableList.of(new ConnectorTableLayoutResult(
+ pruneColumnFields(layoutHandle, constraint),
+ constraint.getSummary()));
+ }
+
return ImmutableList.of(new ConnectorTableLayoutResult(
- getTableLayout(
- session,
- new HiveTableLayoutHandle(
- handle.getSchemaTableName(),
- ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()),
- getPartitionsAsList(hivePartitionResult),
- hivePartitionResult.getCompactEffectivePredicate(),
- hivePartitionResult.getEnforcedConstraint(),
- hivePartitionResult.getBucketHandle(),
- hivePartitionResult.getBucketFilter())),
+ getTableLayout(session, layoutHandle),
hivePartitionResult.getUnenforcedConstraint()));
}
+ private ConnectorTableLayout pruneColumnFields(HiveTableLayoutHandle layoutHandle, Constraint constraint)
+ {
+ Optional> columns = constraint.fieldSets()
+ .map(fieldsSets -> fieldsSets.stream()
+ .filter(fieldSet -> !((HiveColumnHandle) fieldSet.getColumn()).getFieldSet().isPresent())
+ .map(fieldSet -> withFieldSet((HiveColumnHandle) fieldSet.getColumn(), Optional.of(fieldSet.getFields())))
+ .collect(toImmutableList()));
+
+ return new ConnectorTableLayout(
+ layoutHandle,
+ columns,
+ TupleDomain.all(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ emptyList());
+ }
+
@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle layoutHandle)
{
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java
index a8ea96480e6d9..6d2bc4b37382c 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java
@@ -356,6 +356,7 @@ public static List toColumnHandles(List regular
}
return new HiveColumnHandle(
columnHandle.getName(),
+ columnHandle.getFieldSet(),
columnMapping.getCoercionFrom().get(),
columnMapping.getCoercionFrom().get().getTypeSignature(),
columnHandle.getHiveColumnIndex(),
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java
index 62988901396a5..0490ee2611fa3 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java
@@ -64,6 +64,7 @@ public final class HiveSessionProperties
private static final String PARQUET_PREDICATE_PUSHDOWN_ENABLED = "parquet_predicate_pushdown_enabled";
private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled";
private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names";
+ private static final String PARQUET_NESTED_FIELDS_PROJECTION_PUSHDOWN_READER_ENABLED = "parquet_nested_fields_projection_pushdown_enabled";
private static final String MAX_SPLIT_SIZE = "max_split_size";
private static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size";
public static final String RCFILE_OPTIMIZED_WRITER_ENABLED = "rcfile_optimized_writer_enabled";
@@ -221,6 +222,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Experimental: Parquet: Access Parquet columns using names from the file",
hiveClientConfig.isUseParquetColumnNames(),
false),
+ booleanProperty(
+ PARQUET_NESTED_FIELDS_PROJECTION_PUSHDOWN_READER_ENABLED,
+ "Experimental: Parquet: Enable nested fields projection pushdown for Parquet",
+ hiveClientConfig.isParquetNestedFieldsProjectionPushdownEnabled(),
+ false),
dataSizeSessionProperty(
MAX_SPLIT_SIZE,
"Max split size",
@@ -398,6 +404,11 @@ public static boolean isUseParquetColumnNames(ConnectorSession session)
return session.getProperty(PARQUET_USE_COLUMN_NAME, Boolean.class);
}
+ public static boolean isParquetNestedFieldsProjectionPushdownEnabled(ConnectorSession session)
+ {
+ return session.getProperty(PARQUET_NESTED_FIELDS_PROJECTION_PUSHDOWN_READER_ENABLED, Boolean.class);
+ }
+
public static DataSize getMaxSplitSize(ConnectorSession session)
{
return session.getProperty(MAX_SPLIT_SIZE, DataSize.class);
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveStorageFormat.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveStorageFormat.java
index 4e4523d041859..5a9edf5b383ce 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveStorageFormat.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveStorageFormat.java
@@ -14,6 +14,7 @@
package com.facebook.presto.hive;
import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.twitter.hive.thrift.ThriftGeneralInputFormat;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -32,6 +33,7 @@
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -93,6 +95,11 @@ public enum HiveStorageFormat
LazySimpleSerDe.class.getName(),
TextInputFormat.class.getName(),
HiveIgnoreKeyTextOutputFormat.class.getName(),
+ new DataSize(8, Unit.MEGABYTE)),
+ THRIFTBINARY(
+ LazyBinarySerDe.class.getName(),
+ ThriftGeneralInputFormat.class.getName(),
+ HiveIgnoreKeyTextOutputFormat.class.getName(),
new DataSize(8, Unit.MEGABYTE));
private final String serde;
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
index 8c774ebd8cb14..9c895eeeeeba9 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
@@ -28,6 +28,7 @@
import com.facebook.presto.spi.type.StandardTypes;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
+import com.facebook.presto.twitter.hive.thrift.ThriftGeneralInputFormat;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.VerifyException;
@@ -40,6 +41,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
@@ -119,6 +121,7 @@
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.transform;
+import static com.hadoop.compression.lzo.LzoIndex.LZO_INDEX_SUFFIX;
import static java.lang.Byte.parseByte;
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
@@ -155,6 +158,22 @@ public final class HiveUtil
private static final String BIG_DECIMAL_POSTFIX = "BD";
+ private static final PathFilter LZOP_DEFAULT_SUFFIX_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path)
+ {
+ return path.toString().endsWith(".lzo");
+ }
+ };
+
+ private static final PathFilter LZOP_INDEX_DEFAULT_SUFFIX_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path)
+ {
+ return path.toString().endsWith(".lzo.index");
+ }
+ };
+
static {
DateTimeParser[] timestampWithoutTimeZoneParser = {
DateTimeFormat.forPattern("yyyy-M-d").getParser(),
@@ -187,7 +206,7 @@ private HiveUtil()
// propagate serialization configuration to getRecordReader
schema.stringPropertyNames().stream()
- .filter(name -> name.startsWith("serialization."))
+ .filter(name -> name.startsWith("serialization.") || name.startsWith("elephantbird."))
.forEach(name -> jobConf.set(name, schema.getProperty(name)));
// add Airlift LZO and LZOP to head of codecs list so as to not override existing entries
@@ -261,6 +280,11 @@ public static void setReadColumns(Configuration configuration, List rea
return MapredParquetInputFormat.class;
}
+ // Remove this after https://github.com/twitter/elephant-bird/pull/481 is included in a release
+ if ("com.twitter.elephantbird.mapred.input.HiveMultiInputFormat".equals(inputFormatName)) {
+ return ThriftGeneralInputFormat.class;
+ }
+
Class> clazz = conf.getClassByName(inputFormatName);
return (Class extends InputFormat, ?>>) clazz.asSubclass(InputFormat.class);
}
@@ -313,6 +337,21 @@ public static boolean isSplittable(InputFormat, ?> inputFormat, FileSystem fil
}
}
+ public static boolean isLzopCompressedFile(Path filePath)
+ {
+ return LZOP_DEFAULT_SUFFIX_FILTER.accept(filePath);
+ }
+
+ public static boolean isLzopIndexFile(Path filePath)
+ {
+ return LZOP_INDEX_DEFAULT_SUFFIX_FILTER.accept(filePath);
+ }
+
+ public static Path getLzopIndexPath(Path lzoPath)
+ {
+ return lzoPath.suffix(LZO_INDEX_SUFFIX);
+ }
+
public static StructObjectInspector getTableObjectInspector(Properties schema)
{
return getTableObjectInspector(getDeserializer(schema));
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java
index 8a161a9d468be..9d4fc92bfe8ee 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java
@@ -528,6 +528,17 @@ public static boolean isViewFileSystem(HdfsContext context, HdfsEnvironment hdfs
}
}
+ public static boolean isHDFSCompatibleViewFileSystem(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path)
+ {
+ try {
+ return getRawFileSystem(hdfsEnvironment.getFileSystem(context, path))
+ .getClass().getName().equals("org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem");
+ }
+ catch (IOException e) {
+ throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed checking path: " + path, e);
+ }
+ }
+
private static FileSystem getRawFileSystem(FileSystem fileSystem)
{
if (fileSystem instanceof FilterFileSystem) {
@@ -556,6 +567,11 @@ public static Path createTemporaryPath(HdfsContext context, HdfsEnvironment hdfs
temporaryPrefix = ".hive-staging";
}
+ // use relative temporary directory on HDFSCompatibleViewFileSystem
+ if (isHDFSCompatibleViewFileSystem(context, hdfsEnvironment, targetPath)) {
+ temporaryPrefix = "../.hive-staging";
+ }
+
// create a temporary directory on the same filesystem
Path temporaryRoot = new Path(targetPath, temporaryPrefix);
Path temporaryPath = new Path(temporaryRoot, randomUUID().toString());
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java b/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java
index 911ee8f3712ba..22e1a8127817d 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java
@@ -137,16 +137,18 @@ public V run(String callableName, Callable callable)
return callable.call();
}
catch (Exception e) {
+ log.debug("Failed on executing %s with attempt %d, Exception: %s", callableName, attempt, e.getMessage());
e = exceptionMapper.apply(e);
for (Class extends Exception> clazz : exceptionWhiteList) {
if (clazz.isInstance(e)) {
+ log.debug("Exception is in whitelist.");
throw e;
}
}
if (attempt >= maxAttempts || Duration.nanosSince(startTime).compareTo(maxRetryTime) >= 0) {
+ log.debug("Maximum attempts or maximum retry time reached. attempt: %d, maxAttempts: %d, duration: [%s] maxRetryTime: [%s]", attempt, maxAttempts, Duration.nanosSince(startTime).toString(), maxRetryTime.toString());
throw e;
}
- log.debug("Failed on executing %s with attempt %d, will retry. Exception: %s", callableName, attempt, e.getMessage());
int delayInMs = (int) Math.min(minSleepTime.toMillis() * Math.pow(scaleFactor, attempt - 1), maxSleepTime.toMillis());
int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayInMs * 0.1)));
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftMetastoreModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftMetastoreModule.java
index 62d94b3875ba0..1ba8ea19281de 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftMetastoreModule.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftMetastoreModule.java
@@ -16,17 +16,21 @@
import com.facebook.presto.hive.ForCachingHiveMetastore;
import com.facebook.presto.hive.metastore.CachingHiveMetastore;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
+import com.facebook.presto.twitter.hive.MetastoreStaticClusterModule;
+import com.facebook.presto.twitter.hive.MetastoreZkDiscoveryBasedModule;
+import com.facebook.presto.twitter.hive.PooledHiveMetastoreClientFactory;
+import com.facebook.presto.twitter.hive.ZookeeperServersetMetastoreConfig;
import com.google.inject.Binder;
-import com.google.inject.Module;
import com.google.inject.Scopes;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
-import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.airlift.configuration.ConditionalModule.installModuleIf;
import static java.util.Objects.requireNonNull;
import static org.weakref.jmx.ObjectNames.generatedNameOf;
import static org.weakref.jmx.guice.ExportBinder.newExporter;
public class ThriftMetastoreModule
- implements Module
+ extends AbstractConfigurationAwareModule
{
private final String connectorId;
@@ -36,11 +40,11 @@ public ThriftMetastoreModule(String connectorId)
}
@Override
- public void configure(Binder binder)
+ public void setup(Binder binder)
{
binder.bind(HiveMetastoreClientFactory.class).in(Scopes.SINGLETON);
- binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON);
- configBinder(binder).bindConfig(StaticMetastoreConfig.class);
+ binder.bind(PooledHiveMetastoreClientFactory.class).in(Scopes.SINGLETON);
+ bindMetastoreClusterModule();
binder.bind(HiveMetastore.class).to(ThriftHiveMetastore.class).in(Scopes.SINGLETON);
binder.bind(ExtendedHiveMetastore.class).annotatedWith(ForCachingHiveMetastore.class).to(BridgingHiveMetastore.class).in(Scopes.SINGLETON);
@@ -50,4 +54,16 @@ public void configure(Binder binder)
newExporter(binder).export(ExtendedHiveMetastore.class)
.as(generatedNameOf(CachingHiveMetastore.class, connectorId));
}
+
+ private void bindMetastoreClusterModule()
+ {
+ install(installModuleIf(
+ ZookeeperServersetMetastoreConfig.class,
+ zkMetastoreConfig -> zkMetastoreConfig.getZookeeperServerHostAndPort() == null,
+ new MetastoreStaticClusterModule()));
+ install(installModuleIf(
+ ZookeeperServersetMetastoreConfig.class,
+ zkMetastoreConfig -> zkMetastoreConfig.getZookeeperServerHostAndPort() != null,
+ new MetastoreZkDiscoveryBasedModule()));
+ }
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactory.java
index c40ebcf00f44a..215ee05a69dcb 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactory.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactory.java
@@ -263,7 +263,7 @@ private static List getPhysicalHiveColumnHandles(List field = fields.get(fieldId);
int fieldIndex;
if (useParquetColumnNames) {
- fieldIndex = getFieldIndex(fileSchema, columnNames.get(fieldId));
+ fieldIndex = findFieldIndexByName(fileSchema, columnNames.get(fieldId));
}
else {
fieldIndex = hiveColumnIndexes[fieldId];
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java
index 5dd5a33f081be..bda7893c45483 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java
@@ -54,6 +54,7 @@
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA;
+import static com.facebook.presto.hive.HiveSessionProperties.isParquetNestedFieldsProjectionPushdownEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetOptimizedReaderEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetPredicatePushdownEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames;
@@ -61,7 +62,7 @@
import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getColumnIO;
import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getDescriptors;
-import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getParquetType;
+import static com.facebook.presto.hive.parquet.ParquetTypeUtils.getPrunedParquetType;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.buildParquetPredicate;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.getParquetTupleDomain;
import static com.facebook.presto.hive.parquet.predicate.ParquetPredicateUtils.predicateMatches;
@@ -123,6 +124,7 @@ public Optional extends ConnectorPageSource> createPageSource(
schema,
columns,
isUseParquetColumnNames(session),
+ isParquetNestedFieldsProjectionPushdownEnabled(session),
typeManager,
isParquetPredicatePushdownEnabled(session),
effectivePredicate,
@@ -140,6 +142,7 @@ public static ParquetPageSource createParquetPageSource(
Properties schema,
List columns,
boolean useParquetColumnNames,
+ boolean pruneNestedFields,
TypeManager typeManager,
boolean predicatePushdownEnabled,
TupleDomain effectivePredicate,
@@ -158,7 +161,7 @@ public static ParquetPageSource createParquetPageSource(
List fields = columns.stream()
.filter(column -> column.getColumnType() == REGULAR)
- .map(column -> getParquetType(column, fileSchema, useParquetColumnNames))
+ .map(column -> getPrunedParquetType(column, fileSchema, useParquetColumnNames, pruneNestedFields))
.filter(Objects::nonNull)
.collect(toList());
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java
index 88c8bb895dc27..280005788e945 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java
@@ -26,6 +26,8 @@
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
import parquet.io.ColumnIO;
@@ -36,17 +38,21 @@
import parquet.io.ParquetDecodingException;
import parquet.io.PrimitiveColumnIO;
import parquet.schema.DecimalMetadata;
+import parquet.schema.GroupType;
import parquet.schema.MessageType;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Optional.empty;
import static parquet.schema.OriginalType.DECIMAL;
import static parquet.schema.Type.Repetition.REPEATED;
@@ -214,10 +220,91 @@ public static int getFieldIndex(MessageType fileSchema, String name)
}
}
+ public static parquet.schema.Type getPrunedParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames, boolean pruneNestedFields)
+ {
+ parquet.schema.Type originalType = getParquetType(column, messageType, useParquetColumnNames);
+ if (pruneNestedFields && column.getFieldSet().isPresent()) {
+ return pruneParquetType(originalType, column.getFieldSet().get());
+ }
+
+ return originalType;
+ }
+
+ private static parquet.schema.Type pruneParquetType(parquet.schema.Type type, Set requiredFields)
+ {
+ if (requiredFields.isEmpty()) {
+ return type;
+ }
+
+ if (type.isPrimitive()) {
+ return type;
+ }
+
+ Map> fields = groupFields(requiredFields);
+
+ List newFields = fields.entrySet().stream()
+ .map(entry -> pruneParquetType(findParquetTypeByName(type.asGroupType(), entry.getKey()), entry.getValue()))
+ .collect(toImmutableList());
+
+ return type.asGroupType().withNewFields(newFields);
+ }
+
+ private static parquet.schema.Type findParquetTypeByName(GroupType groupType, String name)
+ {
+ parquet.schema.Type type = getParquetTypeByName(groupType, name);
+
+ // when a parquet field is a hive keyword we append an _ to it in hive. When doing
+ // a name-based lookup, we need to strip it off again if we didn't get a direct match.
+ if (type == null && name.endsWith("_")) {
+ type = getParquetTypeByName(groupType, name.substring(0, name.length() - 1));
+ }
+
+ return type;
+ }
+
+ private static parquet.schema.Type getParquetTypeByName(GroupType groupType, String fieldName)
+ {
+ if (groupType.containsField(fieldName)) {
+ return groupType.getType(fieldName);
+ }
+
+ for (parquet.schema.Type type : groupType.getFields()) {
+ if (type.getName().equalsIgnoreCase(fieldName)) {
+ return type;
+ }
+ }
+
+ return null;
+ }
+
+ private static Map> groupFields(Set requiredFields)
+ {
+ Map> fields = new HashMap<>();
+ for (String field : requiredFields) {
+ String[] path = field.split("\\.", 2);
+ String fieldName = path[0];
+ if (fields.containsKey(fieldName) && fields.get(fieldName).isEmpty()) {
+ continue;
+ }
+ Set nestedField = path.length == 1 ? ImmutableSet.of() : ImmutableSet.of(path[1]);
+ if (fields.containsKey(fieldName) && nestedField.isEmpty()) {
+ fields.get(fieldName).clear();
+ }
+ else if (fields.containsKey(fieldName)) {
+ fields.get(fieldName).addAll(nestedField);
+ }
+ else {
+ fields.put(fieldName, new HashSet<>(nestedField));
+ }
+ }
+
+ return ImmutableMap.copyOf(fields);
+ }
+
public static parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
- return getParquetTypeByName(column.getName(), messageType);
+ return findParquetTypeByName(column, messageType);
}
if (column.getHiveColumnIndex() < messageType.getFieldCount()) {
@@ -226,6 +313,56 @@ public static parquet.schema.Type getParquetType(HiveColumnHandle column, Messag
return null;
}
+ /**
+ * Find the column type by name using returning the first match with the following logic:
+ *
+ * - direct match
+ * - case-insensitive match
+ * - if the name ends with _, remove it and direct match
+ * - if the name ends with _, remove it and case-insensitive match
+ *
+ */
+ private static parquet.schema.Type findParquetTypeByName(HiveColumnHandle column, MessageType messageType)
+ {
+ String name = column.getName();
+ parquet.schema.Type type = getParquetTypeByName(name, messageType);
+
+ // when a parquet field is a hive keyword we append an _ to it in hive. When doing
+ // a name-based lookup, we need to strip it off again if we didn't get a direct match.
+ if (type == null && name.endsWith("_")) {
+ type = getParquetTypeByName(name.substring(0, name.length() - 1), messageType);
+ }
+ return type;
+ }
+
+ // Find the column index by name following the same logic as findParquetTypeByName
+ public static int findFieldIndexByName(MessageType fileSchema, String name)
+ {
+ // direct match and case-insensitive match
+ int fieldIndex = getFieldIndex(fileSchema, name);
+
+ // when a parquet field is a hive keyword we append an _ to it in hive.
+ // try remove _ and direct match / case-insensitive match again
+ if (fieldIndex == -1 && name.endsWith("_")) {
+ fieldIndex = getFieldIndex(fileSchema, name.substring(0, name.length() - 1));
+ }
+
+ return fieldIndex;
+ }
+
+ // Find the ColumnIO by name following the same logic as findParquetTypeByName
+ public static ColumnIO findColumnIObyName(GroupColumnIO groupColumnIO, String name)
+ {
+ // direct match and case-insensitive match
+ ColumnIO columnIO = lookupColumnByName(groupColumnIO, name);
+
+ if (columnIO == null && name.endsWith("_")) {
+ columnIO = lookupColumnByName(groupColumnIO, name.substring(0, name.length() - 1));
+ }
+
+ return columnIO;
+ }
+
public static ParquetEncoding getParquetEncoding(Encoding encoding)
{
switch (encoding) {
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
index bea9f9ddabbcc..99b6845a3a6ed 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
@@ -97,7 +97,14 @@ protected LocatedFileStatus computeNext()
if (paths.isEmpty()) {
return endOfData();
}
- remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst());
+ try {
+ remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst());
+ }
+ catch (PrestoException e) {
+ if (!e.getErrorCode().equals(HIVE_FILE_NOT_FOUND.toErrorCode())) {
+ throw e;
+ }
+ }
}
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/InternalHiveSplitFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/InternalHiveSplitFactory.java
index dd19146b5800f..9448b8a27e5bc 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/util/InternalHiveSplitFactory.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/InternalHiveSplitFactory.java
@@ -41,6 +41,7 @@
import java.util.Properties;
import static com.facebook.presto.hive.HiveColumnHandle.isPathColumnHandle;
+import static com.facebook.presto.hive.HiveUtil.isLzopIndexFile;
import static com.facebook.presto.hive.HiveUtil.isSplittable;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -239,6 +240,10 @@ private static Optional getPathDomain(TupleDomain effe
private static boolean pathMatchesPredicate(Optional pathDomain, String path)
{
+ if (isLzopIndexFile(new Path(path))) {
+ return false;
+ }
+
if (!pathDomain.isPresent()) {
return true;
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java
new file mode 100644
index 0000000000000..8751b5bc474f6
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.facebook.presto.hive.metastore.thrift.HiveCluster;
+import com.facebook.presto.hive.metastore.thrift.StaticHiveCluster;
+import com.facebook.presto.hive.metastore.thrift.StaticMetastoreConfig;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class MetastoreStaticClusterModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(StaticMetastoreConfig.class);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java
new file mode 100644
index 0000000000000..5459eca553054
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.facebook.presto.hive.metastore.thrift.HiveCluster;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class MetastoreZkDiscoveryBasedModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(HiveCluster.class).to(ZookeeperServersetHiveCluster.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(ZookeeperServersetMetastoreConfig.class);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java
new file mode 100644
index 0000000000000..270e313e75114
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.facebook.presto.hive.HiveClientConfig;
+import com.facebook.presto.hive.authentication.HiveMetastoreAuthentication;
+import com.facebook.presto.hive.metastore.thrift.HiveMetastoreClient;
+import com.facebook.presto.hive.metastore.thrift.ThriftHiveMetastoreClient;
+import com.facebook.presto.twitter.hive.util.PooledTTransportFactory;
+import com.facebook.presto.twitter.hive.util.TTransportPool;
+import com.google.common.net.HostAndPort;
+import io.airlift.units.Duration;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+
+public class PooledHiveMetastoreClientFactory
+{
+ private final HostAndPort socksProxy;
+ private final int timeoutMillis;
+ private final HiveMetastoreAuthentication metastoreAuthentication;
+ private final TTransportPool transportPool;
+
+ public PooledHiveMetastoreClientFactory(@Nullable HostAndPort socksProxy,
+ Duration timeout,
+ HiveMetastoreAuthentication metastoreAuthentication,
+ int maxTransport,
+ long idleTimeout,
+ long transportEvictInterval,
+ int evictNumTests)
+ {
+ this.socksProxy = socksProxy;
+ this.timeoutMillis = toIntExact(timeout.toMillis());
+ this.metastoreAuthentication = requireNonNull(metastoreAuthentication, "metastoreAuthentication is null");
+ GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+ poolConfig.setMaxIdle(maxTransport);
+ poolConfig.setMaxTotal(maxTransport);
+ poolConfig.setMinEvictableIdleTimeMillis(idleTimeout);
+ poolConfig.setTimeBetweenEvictionRunsMillis(transportEvictInterval);
+ poolConfig.setNumTestsPerEvictionRun(evictNumTests);
+ this.transportPool = new TTransportPool(poolConfig);
+ }
+
+ @Inject
+ public PooledHiveMetastoreClientFactory(HiveClientConfig config,
+ ZookeeperServersetMetastoreConfig zkConfig,
+ HiveMetastoreAuthentication metastoreAuthentication)
+ {
+ this(config.getMetastoreSocksProxy(),
+ config.getMetastoreTimeout(),
+ metastoreAuthentication,
+ zkConfig.getMaxTransport(),
+ zkConfig.getTransportIdleTimeout(),
+ zkConfig.getTransportEvictInterval(),
+ zkConfig.getTransportEvictNumTests());
+ }
+
+ public HiveMetastoreClient create(String host, int port)
+ throws TTransportException
+ {
+ try {
+ TTransport transport = transportPool.borrowObject(host, port);
+ if (transport == null) {
+ transport = transportPool.borrowObject(host, port,
+ new PooledTTransportFactory(transportPool,
+ host, port, socksProxy,
+ timeoutMillis, metastoreAuthentication));
+ }
+ return new ThriftHiveMetastoreClient(transport);
+ }
+ catch (Exception e) {
+ throw new TTransportException(String.format("%s: %s", host, e.getMessage()), e.getCause());
+ }
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java
new file mode 100644
index 0000000000000..cf8df021c1a1b
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.google.common.net.HostAndPort;
+import io.airlift.log.Logger;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+public class ZookeeperMetastoreMonitor
+ implements PathChildrenCacheListener
+{
+ public static final Logger log = Logger.get(ZookeeperMetastoreMonitor.class);
+ private CuratorFramework client;
+ private PathChildrenCache cache;
+ private ConcurrentMap servers; // (Node_Name->HostAndPort)
+
+ public ZookeeperMetastoreMonitor(String zkServer, String watchPath, int maxRetries, int retrySleepTime)
+ throws Exception
+ {
+ client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(retrySleepTime, maxRetries));
+ client.start();
+
+ cache = new PathChildrenCache(client, watchPath, true); // true indicating cache node contents in addition to the stat
+ try {
+ cache.start();
+ }
+ catch (Exception ex) {
+ throw new RuntimeException("Curator PathCache Creation failed: " + ex.getMessage());
+ }
+
+ cache.getListenable().addListener(this);
+ servers = new ConcurrentHashMap<>();
+ }
+
+ public void close()
+ {
+ client.close();
+
+ try {
+ cache.close();
+ }
+ catch (IOException ex) {
+ // do nothing
+ }
+ }
+
+ public List getServers()
+ {
+ return servers.values().stream().collect(Collectors.toList());
+ }
+
+ private HostAndPort deserialize(byte[] bytes)
+ {
+ String serviceEndpoint = "serviceEndpoint";
+ JSONObject data = (JSONObject) JSONValue.parse(new String(bytes));
+ if (data != null && data.containsKey(serviceEndpoint)) {
+ Map hostPortMap = (Map) data.get(serviceEndpoint);
+ String host = hostPortMap.get("host").toString();
+ int port = Integer.parseInt(hostPortMap.get("port").toString());
+ return HostAndPort.fromParts(host, port);
+ }
+ else {
+ log.warn("failed to deserialize child node data");
+ throw new IllegalArgumentException("No host:port found");
+ }
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case CHILD_UPDATED: {
+ HostAndPort hostPort = deserialize(event.getData().getData());
+ String node = ZKPaths.getNodeFromPath(event.getData().getPath());
+ log.info("child updated: " + node + ": " + hostPort);
+ servers.put(node, hostPort);
+ break;
+ }
+
+ case CHILD_REMOVED: {
+ String node = ZKPaths.getNodeFromPath(event.getData().getPath());
+ log.info("child removed: " + node);
+ servers.remove(node);
+ break;
+ }
+
+ default:
+ log.info("connection state changed: " + event.getType());
+ break;
+ }
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java
new file mode 100644
index 0000000000000..c91912f0ff916
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.facebook.presto.hive.metastore.thrift.HiveCluster;
+import com.facebook.presto.hive.metastore.thrift.HiveMetastoreClient;
+import com.google.common.net.HostAndPort;
+import io.airlift.log.Logger;
+import org.apache.thrift.transport.TTransportException;
+
+import javax.inject.Inject;
+
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ZookeeperServersetHiveCluster
+ implements HiveCluster
+{
+ private static final Logger log = Logger.get(ZookeeperServersetHiveCluster.class);
+ private final PooledHiveMetastoreClientFactory clientFactory;
+ private ZookeeperMetastoreMonitor zkMetastoreMonitor;
+
+ @Inject
+ public ZookeeperServersetHiveCluster(ZookeeperServersetMetastoreConfig config, PooledHiveMetastoreClientFactory clientFactory)
+ throws Exception
+ {
+ String zkServerHostAndPort = requireNonNull(config.getZookeeperServerHostAndPort(), "zkServerHostAndPort is null");
+ String zkMetastorePath = requireNonNull(config.getZookeeperMetastorePath(), "zkMetastorePath is null");
+ int zkRetries = requireNonNull(config.getZookeeperMaxRetries(), "zkMaxRetried is null");
+ int zkRetrySleepTime = requireNonNull(config.getZookeeperRetrySleepTime(), "zkRetrySleepTime is null");
+ this.clientFactory = requireNonNull(clientFactory, "clientFactory is null");
+ this.zkMetastoreMonitor = new ZookeeperMetastoreMonitor(zkServerHostAndPort, zkMetastorePath, zkRetries, zkRetrySleepTime);
+ }
+
+ @Override
+ public HiveMetastoreClient createMetastoreClient()
+ {
+ List metastores = zkMetastoreMonitor.getServers();
+ Collections.shuffle(metastores);
+ TTransportException lastException = null;
+ for (HostAndPort metastore : metastores) {
+ try {
+ log.info("Connecting to metastore at: %s", metastore.toString());
+ return clientFactory.create(metastore.getHost(), metastore.getPort());
+ }
+ catch (TTransportException e) {
+ log.debug("Failed connecting to Hive metastore at: %s", metastore.toString());
+ lastException = e;
+ }
+ }
+
+ throw new RuntimeException("Failed connecting to Hive metastore.", lastException);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java
new file mode 100644
index 0000000000000..65b424b6c437e
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+public class ZookeeperServersetMetastoreConfig
+{
+ private String zookeeperServerHostAndPort;
+ private String zookeeperMetastorePath;
+ private int zookeeperRetrySleepTime = 500; // ms
+ private int zookeeperMaxRetries = 3;
+ private int maxTransport = 128;
+ private long transportIdleTimeout = 300_000L;
+ private long transportEvictInterval = 10_000L;
+ private int transportEvictNumTests = 3;
+
+ public String getZookeeperServerHostAndPort()
+ {
+ return zookeeperServerHostAndPort;
+ }
+
+ @Config("hive.metastore.zookeeper.uri")
+ @ConfigDescription("Zookeeper Host and Port")
+ public ZookeeperServersetMetastoreConfig setZookeeperServerHostAndPort(String zookeeperServerHostAndPort)
+ {
+ this.zookeeperServerHostAndPort = zookeeperServerHostAndPort;
+ return this;
+ }
+
+ public String getZookeeperMetastorePath()
+ {
+ return zookeeperMetastorePath;
+ }
+
+ @Config("hive.metastore.zookeeper.path")
+ @ConfigDescription("Hive metastore Zookeeper path")
+ public ZookeeperServersetMetastoreConfig setZookeeperMetastorePath(String zkPath)
+ {
+ this.zookeeperMetastorePath = zkPath;
+ return this;
+ }
+
+ @NotNull
+ public int getZookeeperRetrySleepTime()
+ {
+ return zookeeperRetrySleepTime;
+ }
+
+ @Config("hive.metastore.zookeeper.retry.sleeptime")
+ @ConfigDescription("Zookeeper sleep time between reties")
+ public ZookeeperServersetMetastoreConfig setZookeeperRetrySleepTime(int zookeeperRetrySleepTime)
+ {
+ this.zookeeperRetrySleepTime = zookeeperRetrySleepTime;
+ return this;
+ }
+
+ @Min(1)
+ public int getZookeeperMaxRetries()
+ {
+ return zookeeperMaxRetries;
+ }
+
+ @Config("hive.metastore.zookeeper.max.retries")
+ @ConfigDescription("Zookeeper max reties")
+ public ZookeeperServersetMetastoreConfig setZookeeperMaxRetries(int zookeeperMaxRetries)
+ {
+ this.zookeeperMaxRetries = zookeeperMaxRetries;
+ return this;
+ }
+
+ @Min(1)
+ public int getMaxTransport()
+ {
+ return maxTransport;
+ }
+
+ @Config("hive.metastore.max-transport-num")
+ public ZookeeperServersetMetastoreConfig setMaxTransport(int maxTransport)
+ {
+ this.maxTransport = maxTransport;
+ return this;
+ }
+
+ public long getTransportIdleTimeout()
+ {
+ return transportIdleTimeout;
+ }
+
+ @Config("hive.metastore.transport-idle-timeout")
+ public ZookeeperServersetMetastoreConfig setTransportIdleTimeout(long transportIdleTimeout)
+ {
+ this.transportIdleTimeout = transportIdleTimeout;
+ return this;
+ }
+
+ public long getTransportEvictInterval()
+ {
+ return transportEvictInterval;
+ }
+
+ @Config("hive.metastore.transport-eviction-interval")
+ public ZookeeperServersetMetastoreConfig setTransportEvictInterval(long transportEvictInterval)
+ {
+ this.transportEvictInterval = transportEvictInterval;
+ return this;
+ }
+
+ @Min(0)
+ public int getTransportEvictNumTests()
+ {
+ return transportEvictNumTests;
+ }
+
+ @Config("hive.metastore.transport-eviction-num-tests")
+ public ZookeeperServersetMetastoreConfig setTransportEvictNumTests(int transportEvictNumTests)
+ {
+ this.transportEvictNumTests = transportEvictNumTests;
+ return this;
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdGroup.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdGroup.java
new file mode 100644
index 0000000000000..6b43f7244889f
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdGroup.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.facebook.presto.spi.type.Type;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.facebook.presto.hive.HiveUtil.isStructuralType;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static com.google.common.collect.Maps.immutableEntry;
+import static java.util.Objects.requireNonNull;
+
+public class HiveThriftFieldIdGroup
+{
+ private final Map fields;
+
+ public HiveThriftFieldIdGroup(Map fields)
+ {
+ this.fields = requireNonNull(fields, "fields is null");
+ }
+
+ public HiveThriftFieldIdGroup getFieldIdGroup(short thriftId)
+ {
+ return fields.get(thriftId);
+ }
+
+ public Set getFieldIds()
+ {
+ return ImmutableSet.copyOf(fields.keySet());
+ }
+
+ public static HiveThriftFieldIdGroup create(Type type, Set requiredFields, ThriftFieldIdResolver fieldIdResolver)
+ {
+ if (requiredFields.isEmpty() || (!isStructuralType(type))) {
+ return new HiveThriftFieldIdGroup(ImmutableMap.of());
+ }
+
+ Map> fields = groupFields(requiredFields);
+
+ List fieldNames = type.getTypeParameters().stream()
+ .map(Type::getDisplayName)
+ .collect(toImmutableList());
+
+ return new HiveThriftFieldIdGroup(fields.entrySet().stream()
+ .filter(entry -> fieldNames.contains(entry.getKey()))
+ .map(entry -> immutableEntry(fieldNames.indexOf(entry.getKey()), entry.getValue()))
+ .collect(toImmutableMap(entry -> fieldIdResolver.getThriftId(entry.getKey()),
+ entry -> create(type.getTypeParameters().get(entry.getKey()),
+ entry.getValue(), fieldIdResolver.getNestedResolver(entry.getKey())))));
+ }
+
+ private static Map> groupFields(Set requiredFields)
+ {
+ Map> fields = new HashMap<>();
+ for (String field : requiredFields) {
+ String[] path = field.split("\\.", 2);
+ String fieldName = path[0];
+ if (fields.containsKey(fieldName) && fields.get(fieldName).isEmpty()) {
+ continue;
+ }
+ Set nestedField = path.length == 1 ? ImmutableSet.of() : ImmutableSet.of(path[1]);
+ if (fields.containsKey(fieldName) && nestedField.isEmpty()) {
+ fields.get(fieldName).clear();
+ }
+ else if (fields.containsKey(fieldName)) {
+ fields.get(fieldName).addAll(nestedField);
+ }
+ else {
+ fields.put(fieldName, new HashSet<>(nestedField));
+ }
+ }
+
+ return ImmutableMap.copyOf(fields);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolver.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolver.java
new file mode 100644
index 0000000000000..077a409fd2a2e
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolver.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static com.facebook.presto.hive.HiveUtil.checkCondition;
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Resolve the translation of continuous hive ids to discontinuous thrift ids by using a json property.
+ * Example:
+ * We have the thrift definition:
+ *
+ * struct Name {
+ * 1: string first,
+ * 2: string last
+ * }
+ * struct Person {
+ * 1: Name name,
+ * 3: String phone
+ * }
+ *
+ * Hive table for Person:
+ *
+ * +---------+-------------+----------------------------------+-----------------+
+ * | hive id | column name | type | thrift field id |
+ * +---------+-------------+----------------------------------+-----------------+
+ * | 0 | name | struct | 1 |
+ * +---------+-------------+----------------------------------+-----------------+
+ * | 1 | phone | string | 3 |
+ * +---------+-------------+----------------------------------+-----------------+
+ *
+ * The corresponding id mapping object is:
+ *
+ * x = {
+ * '0': {
+ * '0': 1,
+ * '1': 2,
+ * },
+ * '1': 3
+ * }
+ *
+ * The json property is:
+ *
+ * {"0":{"0":1,"1":2},"1":3}
+ */
+public class HiveThriftFieldIdResolver
+ implements ThriftFieldIdResolver
+{
+ private final JsonNode root;
+ private final Map nestedResolvers = new HashMap<>();
+ private final Map thriftIds = new HashMap<>();
+
+ public HiveThriftFieldIdResolver(JsonNode root)
+ {
+ this.root = root;
+ }
+
+ @Override
+ public short getThriftId(int hiveIndex)
+ {
+ if (root == null) {
+ return (short) (hiveIndex + 1);
+ }
+
+ Short thriftId = thriftIds.get(hiveIndex);
+ if (thriftId != null) {
+ return thriftId;
+ }
+ else {
+ JsonNode child = root.get(String.valueOf(hiveIndex));
+ checkCondition(child != null, HIVE_INVALID_METADATA, "Missed json value for hiveIndex: %s, root: %s", hiveIndex, root);
+ if (child.isNumber()) {
+ thriftId = (short) child.asInt();
+ }
+ else {
+ checkCondition(child.get("id") != null, HIVE_INVALID_METADATA, "Missed id for hiveIndex: %s, root: %s", hiveIndex, root);
+ thriftId = (short) child.get("id").asInt();
+ }
+ thriftIds.put(hiveIndex, thriftId);
+ return thriftId;
+ }
+ }
+
+ @Override
+ public ThriftFieldIdResolver getNestedResolver(int hiveIndex)
+ {
+ if (root == null) {
+ return this;
+ }
+
+ ThriftFieldIdResolver nestedResolver = nestedResolvers.get(hiveIndex);
+ if (nestedResolver != null) {
+ return nestedResolver;
+ }
+ else {
+ JsonNode child = root.get(String.valueOf(hiveIndex));
+ checkCondition(child != null, HIVE_INVALID_METADATA, "Missed json value for hiveIndex: %s, root: %s", hiveIndex, root);
+ nestedResolver = new HiveThriftFieldIdResolver(child);
+ nestedResolvers.put(hiveIndex, nestedResolver);
+ return nestedResolver;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("root", root)
+ .add("nestedResolvers", nestedResolvers)
+ .add("thriftIds", thriftIds)
+ .toString();
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolverFactory.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolverFactory.java
new file mode 100644
index 0000000000000..c1c736a9f4536
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolverFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.airlift.log.Logger;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class HiveThriftFieldIdResolverFactory
+ implements ThriftFieldIdResolverFactory
+{
+ private static final Logger log = Logger.get(HiveThriftFieldIdResolverFactory.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ public static final String THRIFT_FIELD_ID_JSON = "thrift.field.id.json";
+ // The default resolver which returns thrift id as hive id plus one
+ public static final ThriftFieldIdResolver HIVE_THRIFT_FIELD_ID_DEFAULT_RESOLVER = new HiveThriftFieldIdResolver(null);
+
+ public ThriftFieldIdResolver createResolver(Properties schema)
+ {
+ String jsonData = schema.getProperty(THRIFT_FIELD_ID_JSON);
+ if (jsonData == null) {
+ return HIVE_THRIFT_FIELD_ID_DEFAULT_RESOLVER;
+ }
+
+ try {
+ JsonNode root = objectMapper.readTree(jsonData);
+ return new HiveThriftFieldIdResolver(root);
+ }
+ catch (IOException e) {
+ log.debug(e, "Failed to create an optimized thrift id resolver, json string: %s, schema: %s. Will use a default resolver.", jsonData, schema);
+ }
+
+ return HIVE_THRIFT_FIELD_ID_DEFAULT_RESOLVER;
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolver.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolver.java
new file mode 100644
index 0000000000000..83047e1d953fb
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolver.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+public interface ThriftFieldIdResolver
+{
+ ThriftFieldIdResolver getNestedResolver(int hiveIndex);
+ short getThriftId(int hiveIndex);
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolverFactory.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolverFactory.java
new file mode 100644
index 0000000000000..034308aaa569a
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolverFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import java.util.Properties;
+
+public interface ThriftFieldIdResolverFactory
+{
+ ThriftFieldIdResolver createResolver(Properties schema);
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralDeserializer.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralDeserializer.java
new file mode 100644
index 0000000000000..9f1a8d7d77606
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralDeserializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.thrift.TException;
+
+import java.util.Properties;
+
+import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
+import static com.facebook.presto.hive.HiveUtil.checkCondition;
+import static org.apache.hadoop.hive.serde.Constants.SERIALIZATION_CLASS;
+
+public class ThriftGeneralDeserializer
+{
+ private static final String REQUIRED_SERIALIZATION_CLASS = ThriftGenericRow.class.getName();
+ public ThriftGeneralDeserializer(Configuration conf, Properties properties)
+ {
+ String thriftClassName = properties.getProperty(SERIALIZATION_CLASS, null);
+ checkCondition(thriftClassName != null, HIVE_INVALID_METADATA, "Table or partition is missing Hive deserializer property: %s", SERIALIZATION_CLASS);
+ checkCondition(thriftClassName.equals(REQUIRED_SERIALIZATION_CLASS), HIVE_INVALID_METADATA, SERIALIZATION_CLASS + thriftClassName + " cannot match " + REQUIRED_SERIALIZATION_CLASS);
+ }
+
+ public ThriftGenericRow deserialize(Writable writable, HiveThriftFieldIdGroup thriftFieldIdGroup)
+ {
+ checkCondition(writable instanceof ThriftWritable, HIVE_UNKNOWN_ERROR, "Not an instance of ThriftWritable: " + writable);
+ ThriftGenericRow row = (ThriftGenericRow) ((ThriftWritable) writable).get();
+ try {
+ row.parse(thriftFieldIdGroup);
+ }
+ catch (TException e) {
+ throw new IllegalStateException("ThriftGenericRow failed to parse values", e);
+ }
+ return row;
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralInputFormat.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralInputFormat.java
new file mode 100644
index 0000000000000..1f9690036dbf9
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralInputFormat.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.facebook.presto.spi.PrestoException;
+import com.twitter.elephantbird.mapred.input.DeprecatedFileInputFormatWrapper;
+import com.twitter.elephantbird.mapreduce.input.MultiInputFormat;
+import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
+import com.twitter.elephantbird.util.TypeRef;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static com.facebook.presto.hive.HiveUtil.checkCondition;
+import static com.facebook.presto.hive.HiveUtil.getLzopIndexPath;
+import static com.facebook.presto.hive.HiveUtil.isLzopCompressedFile;
+import static java.lang.String.format;
+import static org.apache.hadoop.hive.serde.Constants.SERIALIZATION_CLASS;
+
+/**
+ * Mirror of com.twitter.elephantbird.mapred.input.HiveMultiInputFormat allows to pass the thriftClassName
+ * directly as a property of JobConfig and check lzo index existence when check splitability.
+ * PR for twitter/elephant-bird:
+ * https://github.com/twitter/elephant-bird/pull/481
+ * https://github.com/twitter/elephant-bird/pull/485
+ * Remove the class once #481 is included in a release
+ */
+@SuppressWarnings("deprecation")
+public class ThriftGeneralInputFormat
+ extends DeprecatedFileInputFormatWrapper
+{
+ public ThriftGeneralInputFormat()
+ {
+ super(new MultiInputFormat());
+ }
+
+ private void initialize(FileSplit split, JobConf job) throws IOException
+ {
+ String thriftClassName = job.get(SERIALIZATION_CLASS);
+ checkCondition(thriftClassName != null, HIVE_INVALID_METADATA, "Table or partition is missing Hive deserializer property: %s", SERIALIZATION_CLASS);
+
+ try {
+ Class thriftClass = job.getClassByName(thriftClassName);
+ setInputFormatInstance(new MultiInputFormat(new TypeRef(thriftClass) {}));
+ }
+ catch (ClassNotFoundException e) {
+ throw new PrestoException(HIVE_INVALID_METADATA, format("Failed getting class for %s", thriftClassName));
+ }
+ }
+
+ @Override
+ public boolean isSplitable(FileSystem fs, Path filename)
+ {
+ if (isLzopCompressedFile(filename)) {
+ Path indexFile = getLzopIndexPath(filename);
+ try {
+ return fs.exists(indexFile);
+ }
+ catch (IOException e) {
+ return false;
+ }
+ }
+ return super.isSplitable(fs, filename);
+ }
+
+ @Override
+ public RecordReader getRecordReader(
+ InputSplit split,
+ JobConf job,
+ Reporter reporter)
+ throws IOException
+ {
+ initialize((FileSplit) split, job);
+ return super.getRecordReader(split, job, reporter);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGenericRow.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGenericRow.java
new file mode 100644
index 0000000000000..c97f857996f9e
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGenericRow.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import io.airlift.log.Logger;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TMap;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolUtil;
+import org.apache.thrift.protocol.TSet;
+import org.apache.thrift.protocol.TType;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TTransport;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ThriftGenericRow
+ implements TBase
+{
+ private static final Logger log = Logger.get(ThriftGenericRow.class);
+ private final Map values = new HashMap<>();
+ private byte[] buf;
+ private int off;
+ private int len;
+
+ public ThriftGenericRow()
+ {
+ }
+
+ public ThriftGenericRow(Map values)
+ {
+ this.values.putAll(values);
+ }
+
+ public class Fields
+ implements TFieldIdEnum
+ {
+ private final short thriftId;
+ private final String fieldName;
+
+ Fields(short thriftId, String fieldName)
+ {
+ this.thriftId = thriftId;
+ this.fieldName = fieldName;
+ }
+
+ public short getThriftFieldId()
+ {
+ return thriftId;
+ }
+
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+ }
+
+ public void read(TProtocol iprot)
+ throws TException
+ {
+ TTransport trans = iprot.getTransport();
+ buf = trans.getBuffer();
+ off = trans.getBufferPosition();
+ TProtocolUtil.skip(iprot, TType.STRUCT);
+ len = trans.getBufferPosition() - off;
+ }
+
+ public void parse(HiveThriftFieldIdGroup fieldIdGroup)
+ throws TException
+ {
+ TMemoryInputTransport trans = new TMemoryInputTransport(buf, off, len);
+ TBinaryProtocol iprot = new TBinaryProtocol(trans);
+ TField field;
+ iprot.readStructBegin();
+ while (true) {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ if (fieldIdGroup != null && fieldIdGroup.getFieldIdGroup(field.id) == null) {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ else {
+ values.put(field.id, readElem(iprot, field.type, fieldIdGroup == null ? null : fieldIdGroup.getFieldIdGroup(field.id)));
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ }
+
+ private Object readElem(TProtocol iprot, byte type, HiveThriftFieldIdGroup fieldIdGroup)
+ throws TException
+ {
+ switch (type) {
+ case TType.BOOL:
+ return iprot.readBool();
+ case TType.BYTE:
+ return iprot.readByte();
+ case TType.I16:
+ return iprot.readI16();
+ case TType.ENUM:
+ case TType.I32:
+ return iprot.readI32();
+ case TType.I64:
+ return iprot.readI64();
+ case TType.DOUBLE:
+ return iprot.readDouble();
+ case TType.STRING:
+ return iprot.readString();
+ case TType.STRUCT:
+ return readStruct(iprot, fieldIdGroup);
+ case TType.LIST:
+ return readList(iprot, fieldIdGroup);
+ case TType.SET:
+ return readSet(iprot, fieldIdGroup);
+ case TType.MAP:
+ return readMap(iprot, fieldIdGroup);
+ default:
+ TProtocolUtil.skip(iprot, type);
+ return null;
+ }
+ }
+
+ private Object readStruct(TProtocol iprot, HiveThriftFieldIdGroup fieldIdGroup)
+ throws TException
+ {
+ ThriftGenericRow elem = new ThriftGenericRow();
+ elem.read(iprot);
+ elem.parse(fieldIdGroup);
+ return elem;
+ }
+
+ private Object readList(TProtocol iprot, HiveThriftFieldIdGroup fieldIdGroup)
+ throws TException
+ {
+ TList ilist = iprot.readListBegin();
+ List
diff --git a/presto-jmx/pom.xml b/presto-jmx/pom.xml
index 87d63c976f031..19cef1cb9ae1e 100644
--- a/presto-jmx/pom.xml
+++ b/presto-jmx/pom.xml
@@ -4,7 +4,7 @@
com.facebook.presto
presto-root
- 0.208
+ 0.208-tw-0.53
presto-jmx
diff --git a/presto-kafka/pom.xml b/presto-kafka/pom.xml
index e971cf7561c1d..dd8c20437276a 100644
--- a/presto-kafka/pom.xml
+++ b/presto-kafka/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.208
+ 0.208-tw-0.53
presto-kafka
diff --git a/presto-kafka07/pom.xml b/presto-kafka07/pom.xml
new file mode 100644
index 0000000000000..53363bd92c967
--- /dev/null
+++ b/presto-kafka07/pom.xml
@@ -0,0 +1,244 @@
+
+
+ 4.0.0
+
+
+ com.facebook.presto
+ presto-root
+ 0.208-tw-0.53
+
+
+ presto-kafka07
+ Presto - Kafka Connector for ver0.7
+ presto-plugin
+
+
+ ${project.parent.basedir}
+
+
+ true
+
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ configuration
+
+
+
+ com.facebook.presto
+ presto-record-decoder
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+
+
+
+ javax.validation
+ validation-api
+
+
+
+ com.twitter
+ rosette-kafka_2.11
+ 0.7.2-21
+
+
+ jsr305
+ com.google.code.findbugs
+
+
+ zookeeper
+ org.apache.zookeeper
+
+
+ finagle-ostrich4_2.10
+ com.twitter
+
+
+ commons-lang
+ commons-lang
+
+
+
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+ joda-time
+ joda-time
+
+
+
+ org.scala-lang
+ scala-library
+ 2.11.7
+
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+ javax.inject
+ javax.inject
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+
+ com.facebook.presto
+ presto-spi
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.airlift
+ units
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+
+ io.airlift
+ log-manager
+ runtime
+
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ com.facebook.presto
+ presto-main
+ test
+
+
+
+ com.facebook.presto
+ presto-tpch
+ test
+
+
+
+ com.facebook.presto
+ presto-client
+ test
+
+
+
+ com.facebook.presto
+ presto-tests
+ test
+
+
+
+ io.airlift.tpch
+ tpch
+ test
+
+
+
+ com.github.sgroschupf
+ zkclient
+ 0.1
+
+
+ log4j
+ log4j
+
+
+
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+ **/TestKafkaDistributed.java
+
+
+
+
+
+
+
+
+ ci
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+
+
+
+
+
+
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaColumnHandle.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaColumnHandle.java
new file mode 100644
index 0000000000000..b8ec023b24014
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaColumnHandle.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.decoder.DecoderColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Kafka specific connector column handle.
+ */
+public final class KafkaColumnHandle
+ implements DecoderColumnHandle, Comparable
+{
+ private final String connectorId;
+ private final int ordinalPosition;
+
+ /**
+ * Column Name
+ */
+ private final String name;
+
+ /**
+ * Column type
+ */
+ private final Type type;
+
+ /**
+ * Mapping hint for the decoder. Can be null.
+ */
+ private final String mapping;
+
+ /**
+ * Data format to use (selects the decoder). Can be null.
+ */
+ private final String dataFormat;
+
+ /**
+ * Additional format hint for the selected decoder. Selects a decoder subtype (e.g. which timestamp decoder).
+ */
+ private final String formatHint;
+
+ /**
+ * True if the key decoder should be used, false if the message decoder should be used.
+ */
+ private final boolean keyDecoder;
+
+ /**
+ * True if the column should be hidden.
+ */
+ private final boolean hidden;
+
+ /**
+ * True if the column is internal to the connector and not defined by a topic definition.
+ */
+ private final boolean internal;
+
+ @JsonCreator
+ public KafkaColumnHandle(
+ @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("ordinalPosition") int ordinalPosition,
+ @JsonProperty("name") String name,
+ @JsonProperty("type") Type type,
+ @JsonProperty("mapping") String mapping,
+ @JsonProperty("dataFormat") String dataFormat,
+ @JsonProperty("formatHint") String formatHint,
+ @JsonProperty("keyDecoder") boolean keyDecoder,
+ @JsonProperty("hidden") boolean hidden,
+ @JsonProperty("internal") boolean internal)
+
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.ordinalPosition = ordinalPosition;
+ this.name = requireNonNull(name, "name is null");
+ this.type = requireNonNull(type, "type is null");
+ this.mapping = mapping;
+ this.dataFormat = dataFormat;
+ this.formatHint = formatHint;
+ this.keyDecoder = keyDecoder;
+ this.hidden = hidden;
+ this.internal = internal;
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public int getOrdinalPosition()
+ {
+ return ordinalPosition;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ @JsonProperty
+ public Type getType()
+ {
+ return type;
+ }
+
+ @Override
+ @JsonProperty
+ public String getMapping()
+ {
+ return mapping;
+ }
+
+ @Override
+ @JsonProperty
+ public String getDataFormat()
+ {
+ return dataFormat;
+ }
+
+ @Override
+ @JsonProperty
+ public String getFormatHint()
+ {
+ return formatHint;
+ }
+
+ @JsonProperty
+ public boolean isKeyDecoder()
+ {
+ return keyDecoder;
+ }
+
+ @JsonProperty
+ public boolean isHidden()
+ {
+ return hidden;
+ }
+
+ @Override
+ @JsonProperty
+ public boolean isInternal()
+ {
+ return internal;
+ }
+
+ ColumnMetadata getColumnMetadata()
+ {
+ return new ColumnMetadata(name, type, null, hidden);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(connectorId, ordinalPosition, name, type, mapping, dataFormat, formatHint, keyDecoder, hidden, internal);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ KafkaColumnHandle other = (KafkaColumnHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) &&
+ Objects.equals(this.ordinalPosition, other.ordinalPosition) &&
+ Objects.equals(this.name, other.name) &&
+ Objects.equals(this.type, other.type) &&
+ Objects.equals(this.mapping, other.mapping) &&
+ Objects.equals(this.dataFormat, other.dataFormat) &&
+ Objects.equals(this.formatHint, other.formatHint) &&
+ Objects.equals(this.keyDecoder, other.keyDecoder) &&
+ Objects.equals(this.hidden, other.hidden) &&
+ Objects.equals(this.internal, other.internal);
+ }
+
+ @Override
+ public int compareTo(KafkaColumnHandle otherHandle)
+ {
+ return Integer.compare(this.getOrdinalPosition(), otherHandle.getOrdinalPosition());
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("connectorId", connectorId)
+ .add("ordinalPosition", ordinalPosition)
+ .add("name", name)
+ .add("type", type)
+ .add("mapping", mapping)
+ .add("dataFormat", dataFormat)
+ .add("formatHint", formatHint)
+ .add("keyDecoder", keyDecoder)
+ .add("hidden", hidden)
+ .add("internal", internal)
+ .toString();
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnector.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnector.java
new file mode 100644
index 0000000000000..69cfe62e87379
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Kafka specific implementation of the Presto Connector SPI. This is a read only connector.
+ */
+public class KafkaConnector
+ implements Connector
+{
+ private static final Logger log = Logger.get(KafkaConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final KafkaMetadata metadata;
+ private final KafkaSplitManager splitManager;
+ private final KafkaRecordSetProvider recordSetProvider;
+
+ @Inject
+ public KafkaConnector(
+ LifeCycleManager lifeCycleManager,
+ KafkaMetadata metadata,
+ KafkaSplitManager splitManager,
+ KafkaRecordSetProvider recordSetProvider)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
+ {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return KafkaTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorRecordSetProvider getRecordSetProvider()
+ {
+ return recordSetProvider;
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java
new file mode 100644
index 0000000000000..de5615de21676
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.HostAddress;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.configuration.Config;
+import io.airlift.units.DataSize;
+import io.airlift.units.DataSize.Unit;
+import io.airlift.units.Duration;
+import io.airlift.units.MinDuration;
+
+import javax.validation.constraints.NotNull;
+
+import java.io.File;
+import java.util.Set;
+
+import static com.google.common.collect.Iterables.transform;
+
+public class KafkaConnectorConfig
+{
+ private static final int KAFKA_DEFAULT_PORT = 9092;
+
+ /**
+ * Seed nodes for Kafka cluster. At least one must exist.
+ */
+ private Set nodes = ImmutableSet.of();
+
+ /**
+ * Timeout to connect to Kafka.
+ */
+ private Duration kafkaConnectTimeout = Duration.valueOf("10s");
+
+ /**
+ * Buffer size for connecting to Kafka.
+ */
+ private DataSize kafkaBufferSize = new DataSize(64, Unit.KILOBYTE);
+
+ /**
+ * The schema name to use in the connector.
+ */
+ private String defaultSchema = "default";
+
+ /**
+ * Set of tables known to this connector. For each table, a description file may be present in the catalog folder which describes columns for the given topic.
+ */
+ private Set tableNames = ImmutableSet.of();
+
+ /**
+ * Folder holding the JSON description files for Kafka topics.
+ */
+ private File tableDescriptionDir = new File("etc/kafka07/");
+
+ /**
+ * Whether internal columns are shown in table metadata or not. Default is no.
+ */
+ private boolean hideInternalColumns = true;
+
+ /**
+ * ZK endpoint for getting broker list
+ */
+ private String zkEndpoint = "";
+
+ /**
+ * Fetch size
+ */
+ private int fetchSize = 10 * 1024 * 1024;
+
+ @NotNull
+ public File getTableDescriptionDir()
+ {
+ return tableDescriptionDir;
+ }
+
+ @Config("kafka.table-description-dir")
+ public KafkaConnectorConfig setTableDescriptionDir(File tableDescriptionDir)
+ {
+ this.tableDescriptionDir = tableDescriptionDir;
+ return this;
+ }
+
+ @NotNull
+ public Set getTableNames()
+ {
+ return tableNames;
+ }
+
+ @Config("kafka.table-names")
+ public KafkaConnectorConfig setTableNames(String tableNames)
+ {
+ this.tableNames = ImmutableSet.copyOf(Splitter.on(',').omitEmptyStrings().trimResults().split(tableNames));
+ return this;
+ }
+
+ @NotNull
+ public String getDefaultSchema()
+ {
+ return defaultSchema;
+ }
+
+ @Config("kafka.default-schema")
+ public KafkaConnectorConfig setDefaultSchema(String defaultSchema)
+ {
+ this.defaultSchema = defaultSchema;
+ return this;
+ }
+
+ public Set getNodes()
+ {
+ return nodes;
+ }
+
+ @Config("kafka.nodes")
+ public KafkaConnectorConfig setNodes(String nodes)
+ {
+ this.nodes = (nodes == null) ? null : parseNodes(nodes);
+ return this;
+ }
+
+ @MinDuration("1s")
+ public Duration getKafkaConnectTimeout()
+ {
+ return kafkaConnectTimeout;
+ }
+
+ @Config("kafka.connect-timeout")
+ public KafkaConnectorConfig setKafkaConnectTimeout(String kafkaConnectTimeout)
+ {
+ this.kafkaConnectTimeout = Duration.valueOf(kafkaConnectTimeout);
+ return this;
+ }
+
+ public DataSize getKafkaBufferSize()
+ {
+ return kafkaBufferSize;
+ }
+
+ @Config("kafka.buffer-size")
+ public KafkaConnectorConfig setKafkaBufferSize(String kafkaBufferSize)
+ {
+ this.kafkaBufferSize = DataSize.valueOf(kafkaBufferSize);
+ return this;
+ }
+
+ public boolean isHideInternalColumns()
+ {
+ return hideInternalColumns;
+ }
+
+ @Config("kafka.hide-internal-columns")
+ public KafkaConnectorConfig setHideInternalColumns(boolean hideInternalColumns)
+ {
+ this.hideInternalColumns = hideInternalColumns;
+ return this;
+ }
+
+ @NotNull
+ public String getZkEndpoint()
+ {
+ return zkEndpoint;
+ }
+
+ @Config("kafka.zk-endpoint")
+ public KafkaConnectorConfig setZkEndpoint(String zkEndpoint)
+ {
+ this.zkEndpoint = zkEndpoint;
+ return this;
+ }
+
+ public int getFetchSize()
+ {
+ return fetchSize;
+ }
+
+ @Config("kafka.fetch-size")
+ public KafkaConnectorConfig setFetchSize(int fetchSize)
+ {
+ this.fetchSize = fetchSize;
+ return this;
+ }
+
+ public static ImmutableSet parseNodes(String nodes)
+ {
+ Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
+ return ImmutableSet.copyOf(transform(splitter.split(nodes), KafkaConnectorConfig::toHostAddress));
+ }
+
+ private static HostAddress toHostAddress(String value)
+ {
+ return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT);
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java
new file mode 100644
index 0000000000000..b9721f0459f83
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Creates Kafka Connectors based off connectorId and specific configuration.
+ */
+public class KafkaConnectorFactory
+ implements ConnectorFactory
+{
+ private final Optional>> tableDescriptionSupplier;
+
+ KafkaConnectorFactory(Optional>> tableDescriptionSupplier)
+ {
+ this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return "kafka07";
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver()
+ {
+ return new KafkaHandleResolver();
+ }
+
+ @Override
+ public Connector create(String connectorId, Map config, ConnectorContext context)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ requireNonNull(config, "config is null");
+
+ try {
+ Bootstrap app = new Bootstrap(
+ new JsonModule(),
+ new KafkaConnectorModule(),
+ binder -> {
+ binder.bind(KafkaConnectorId.class).toInstance(new KafkaConnectorId(connectorId));
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+
+ if (tableDescriptionSupplier.isPresent()) {
+ binder.bind(new TypeLiteral>>() {}).toInstance(tableDescriptionSupplier.get());
+ }
+ else {
+ binder.bind(new TypeLiteral>>() {}).to(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON);
+ }
+ });
+
+ Injector injector = app.strictConfig()
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
+
+ return injector.getInstance(KafkaConnector.class);
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorId.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorId.java
new file mode 100644
index 0000000000000..3470980df0735
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class KafkaConnectorId
+{
+ private final String connectorId;
+
+ public KafkaConnectorId(String connectorId)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ KafkaConnectorId other = (KafkaConnectorId) obj;
+ return Objects.equals(this.connectorId, other.connectorId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(connectorId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return connectorId;
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
new file mode 100644
index 0000000000000..8afa5068eb82e
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.decoder.DecoderModule;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.airlift.json.JsonBinder.jsonBinder;
+import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Guice module for the Apache Kafka connector.
+ */
+public class KafkaConnectorModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);
+
+ binder.bind(KafkaMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
+
+ binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
+
+ configBinder(binder).bindConfig(KafkaConnectorConfig.class);
+
+ jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
+ jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class);
+
+ binder.install(new DecoderModule());
+ }
+
+ public static final class TypeDeserializer
+ extends FromStringDeserializer
+ {
+ private static final long serialVersionUID = 1L;
+
+ private final TypeManager typeManager;
+
+ @Inject
+ public TypeDeserializer(TypeManager typeManager)
+ {
+ super(Type.class);
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ protected Type _deserialize(String value, DeserializationContext context)
+ {
+ return typeManager.getType(parseTypeSignature(value));
+ }
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java
new file mode 100644
index 0000000000000..9338b98492116
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.ErrorCode;
+import com.facebook.presto.spi.ErrorCodeSupplier;
+import com.facebook.presto.spi.ErrorType;
+
+import static com.facebook.presto.spi.ErrorType.EXTERNAL;
+
+/**
+ * Kafka connector specific error codes.
+ */
+public enum KafkaErrorCode
+ implements ErrorCodeSupplier
+{
+ KAFKA_SPLIT_ERROR(0, EXTERNAL);
+
+ private final ErrorCode errorCode;
+
+ KafkaErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0102_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaHandleResolver.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaHandleResolver.java
new file mode 100644
index 0000000000000..539b082701298
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaHandleResolver.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Kafka specific {@link com.facebook.presto.spi.ConnectorHandleResolver} implementation.
+ */
+public class KafkaHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return KafkaTableHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return KafkaColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return KafkaSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
+ {
+ return KafkaTableLayoutHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return KafkaTransactionHandle.class;
+ }
+
+ static KafkaTableHandle convertTableHandle(ConnectorTableHandle tableHandle)
+ {
+ requireNonNull(tableHandle, "tableHandle is null");
+ checkArgument(tableHandle instanceof KafkaTableHandle, "tableHandle is not an instance of KafkaTableHandle");
+ return (KafkaTableHandle) tableHandle;
+ }
+
+ static KafkaColumnHandle convertColumnHandle(ColumnHandle columnHandle)
+ {
+ requireNonNull(columnHandle, "columnHandle is null");
+ checkArgument(columnHandle instanceof KafkaColumnHandle, "columnHandle is not an instance of KafkaColumnHandle");
+ return (KafkaColumnHandle) columnHandle;
+ }
+
+ static KafkaSplit convertSplit(ConnectorSplit split)
+ {
+ requireNonNull(split, "split is null");
+ checkArgument(split instanceof KafkaSplit, "split is not an instance of KafkaSplit");
+ return (KafkaSplit) split;
+ }
+
+ static KafkaTableLayoutHandle convertLayout(ConnectorTableLayoutHandle layout)
+ {
+ requireNonNull(layout, "layout is null");
+ checkArgument(layout instanceof KafkaTableLayoutHandle, "layout is not an instance of KafkaTableLayoutHandle");
+ return (KafkaTableLayoutHandle) layout;
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java
new file mode 100644
index 0000000000000..fb74806e00274
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.BigintType;
+import com.facebook.presto.spi.type.BooleanType;
+import com.facebook.presto.spi.type.Type;
+
+import java.util.Map;
+
+import static com.facebook.presto.spi.type.VarcharType.createUnboundedVarcharType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static java.util.Arrays.stream;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+
+/**
+ * Describes an internal (managed by the connector) field which is added to each table row. The definition itself makes the row
+ * show up in the tables (the columns are hidden by default, so they must be explicitly selected) but unless the field is hooked in using the
+ * forBooleanValue/forLongValue/forBytesValue methods and the resulting FieldValueProvider is then passed into the appropriate row decoder, the fields
+ * will be null. Most values are assigned in the {@link com.facebook.presto.kafka.KafkaRecordSet}.
+ */
+public enum KafkaInternalFieldDescription
+{
+ /**
+ * _partition_id - Kafka partition id.
+ */
+ PARTITION_ID_FIELD("_partition_id", BigintType.BIGINT, "Partition Id"),
+
+ /**
+ * _partition_offset - The current offset of the message in the partition.
+ */
+ PARTITION_OFFSET_FIELD("_partition_offset", BigintType.BIGINT, "Offset for the message within the partition"),
+
+ /**
+ * _segment_start - Kafka start offset for the segment which contains the current message. This is per-partition.
+ */
+ SEGMENT_START_FIELD("_segment_start", BigintType.BIGINT, "Segment start offset"),
+
+ /**
+ * _segment_end - Kafka end offset for the segment which contains the current message. This is per-partition. The end offset is the first offset that is *not* in the segment.
+ */
+ SEGMENT_END_FIELD("_segment_end", BigintType.BIGINT, "Segment end offset"),
+
+ /**
+ * _segment_count - Running count of messages in a segment.
+ */
+ SEGMENT_COUNT_FIELD("_segment_count", BigintType.BIGINT, "Running message count per segment"),
+
+ /**
+ * _message_corrupt - True if the row converter could not read the a message. May be null if the row converter does not set a value (e.g. the dummy row converter does not).
+ */
+ MESSAGE_CORRUPT_FIELD("_message_corrupt", BooleanType.BOOLEAN, "Message data is corrupt"),
+
+ /**
+ * _message - Represents the full topic as a text column. Format is UTF-8 which may be wrong for some topics. TODO: make charset configurable.
+ */
+ MESSAGE_FIELD("_message", createUnboundedVarcharType(), "Message text"),
+
+ /**
+ * _message_length - length in bytes of the message.
+ */
+ MESSAGE_LENGTH_FIELD("_message_length", BigintType.BIGINT, "Total number of message bytes"),
+
+ /**
+ * _key_corrupt - True if the row converter could not read the a key. May be null if the row converter does not set a value (e.g. the dummy row converter does not).
+ */
+ KEY_CORRUPT_FIELD("_key_corrupt", BooleanType.BOOLEAN, "Key data is corrupt"),
+
+ /**
+ * _key - Represents the key as a text column. Format is UTF-8 which may be wrong for topics. TODO: make charset configurable.
+ */
+ KEY_FIELD("_key", createUnboundedVarcharType(), "Key text"),
+
+ /**
+ * _key_length - length in bytes of the key.
+ */
+ KEY_LENGTH_FIELD("_key_length", BigintType.BIGINT, "Total number of key bytes"),
+
+ /**
+ * _timestamp - offset timestamp, used to narrow scan range
+ */
+ OFFSET_TIMESTAMP_FIELD("_timestamp", BigintType.BIGINT, "Offset Timestamp");
+
+ private static final Map BY_COLUMN_NAME =
+ stream(KafkaInternalFieldDescription.values())
+ .collect(toImmutableMap(KafkaInternalFieldDescription::getColumnName, identity()));
+
+ public static KafkaInternalFieldDescription forColumnName(String columnName)
+ {
+ KafkaInternalFieldDescription description = BY_COLUMN_NAME.get(columnName);
+ checkArgument(description != null, "Unknown internal column name %s", columnName);
+ return description;
+ }
+
+ private final String columnName;
+ private final Type type;
+ private final String comment;
+
+ KafkaInternalFieldDescription(
+ String columnName,
+ Type type,
+ String comment)
+ {
+ checkArgument(!isNullOrEmpty(columnName), "name is null or is empty");
+ this.columnName = columnName;
+ this.type = requireNonNull(type, "type is null");
+ this.comment = requireNonNull(comment, "comment is null");
+ }
+
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ public Type getType()
+ {
+ return type;
+ }
+
+ KafkaColumnHandle getColumnHandle(String connectorId, int index, boolean hidden)
+ {
+ return new KafkaColumnHandle(connectorId,
+ index,
+ getColumnName(),
+ getType(),
+ null,
+ null,
+ null,
+ false,
+ hidden,
+ true);
+ }
+
+ ColumnMetadata getColumnMetadata(boolean hidden)
+ {
+ return new ColumnMetadata(columnName, type, comment, hidden);
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java
new file mode 100644
index 0000000000000..225c619c28aee
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.decoder.dummy.DummyRowDecoder;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.TableNotFoundException;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Marker;
+import com.facebook.presto.spi.predicate.Range;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.log.Logger;
+
+import javax.inject.Inject;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static com.facebook.presto.kafka.KafkaHandleResolver.convertColumnHandle;
+import static com.facebook.presto.kafka.KafkaHandleResolver.convertTableHandle;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Manages the Kafka connector specific metadata information. The Connector provides an additional set of columns
+ * for each table that are created as hidden columns. See {@link KafkaInternalFieldDescription} for a list
+ * of per-topic additional columns.
+ */
+public class KafkaMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger log = Logger.get(KafkaMetadata.class);
+
+ private final String connectorId;
+ private final boolean hideInternalColumns;
+ private final Map tableDescriptions;
+ private final Set internalFieldDescriptions;
+
+ @Inject
+ public KafkaMetadata(
+ KafkaConnectorId connectorId,
+ KafkaConnectorConfig kafkaConnectorConfig,
+ Supplier