diff --git a/.travis.yml b/.travis.yml
index 3ec00bf9b15af..70a0fb1d28ea5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,27 +29,43 @@ dist: trusty
cache:
directories:
- $HOME/.m2/repository
+ - $HOME/.thrift
services:
- docker
+before_install:
+ - |
+ if [[ ! -e $HOME/.thrift/bin/thrift ]]; then
+ sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libboost-filesystem-dev libboost-thread-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev
+ wget https://www.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz
+ tar xfz thrift-0.9.3.tar.gz
+ cd thrift-0.9.3 && ./configure --without-cpp --without-c_glib --without-python --without-ruby --without-php --without-erlang --without-go --without-nodejs -q --prefix=$HOME/.thrift
+ sudo make install > thrift_make_install.log
+ cd ..
+ fi
+ - |
+ if [[ ! -e /usr/local/bin/thrift ]]; then
+ sudo ln -s $HOME/.thrift/bin/thrift /usr/local/bin/thrift
+ fi
+
install:
- ./mvnw -v
- |
if [[ -v TEST_SPECIFIC_MODULES ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl $TEST_SPECIFIC_MODULES -am
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl $TEST_SPECIFIC_MODULES -am
fi
- |
if [[ -v TEST_OTHER_MODULES ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl '!presto-docs,!presto-server,!presto-server-rpm'
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl '!presto-docs,!presto-server,!presto-server-rpm'
fi
- |
if [[ -v PRODUCT_TESTS_BASIC_ENVIRONMENT || -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT || -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT_2 ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl '!presto-docs,!presto-server-rpm'
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl '!presto-docs,!presto-server-rpm'
fi
- |
if [[ -v HIVE_TESTS ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl presto-hive-hadoop2 -am
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl presto-hive-hadoop2 -am
fi
before_script:
@@ -71,15 +87,15 @@ before_script:
script:
- |
if [[ -v MAVEN_CHECKS ]]; then
- ./mvnw install -DskipTests -B -T C1 -P ci
+ ./mvnw install -DskipTests -B -T C1 -P 'ci,!twitter-modules'
fi
- |
if [[ -v TEST_SPECIFIC_MODULES ]]; then
- ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -B -pl $TEST_SPECIFIC_MODULES $TEST_FLAGS
+ ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -P !twitter-modules -B -pl $TEST_SPECIFIC_MODULES $TEST_FLAGS
fi
- |
if [[ -v TEST_OTHER_MODULES ]]; then
- ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -B -pl $TEST_OTHER_MODULES
+ ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -P !twitter-modules -B -pl $TEST_OTHER_MODULES
fi
- |
if [[ -v PRODUCT_TESTS_BASIC_ENVIRONMENT ]]; then
@@ -122,7 +138,7 @@ script:
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT_2 ]]; then
presto-product-tests/bin/run_on_docker.sh \
- multinode-tls -g smoke,cli,group-by,join,tls
+ multinode-tls -g smoke,group-by,join,tls
fi
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT_2 ]]; then
@@ -163,8 +179,11 @@ before_cache:
- rm -rf $HOME/.m2/repository/com/facebook
notifications:
+ hipchat:
+ rooms:
+ secure: peNh1KxwlxIpFyb60S8AMvaJThgh1LsjE+Whf1rYkJalVd2wUrqBIoyDKVSueyHD01hQ06gT7rBV6Pu/QcBMR1a9BbMCjERfxLZFUAheuC2Rsb+p1c4dyvBcFUGacgW7XWKCaVYGDGxuUvb0I3Z8cR6KxhK2xi88tHiqBGVGV2yI6zzOTpWVknMfFBtn+ONU1Ob2P6trclXaDyFd4MxubULri6CQdl35eQAq/VnmR3SZOgyVu3V30MGKwI3zhSli+3VqmW0JmaDGoHN6gznM1+VqABLgmIq0P+n+r5gdZWRCorq10NZCFMhVQ8U6rQHcL7sAniYJJsC/yRt6+pjyzIF4N+LSzZ7T+FLxQqT7k/1ukNgrujLDfTpn76Mo9eYTZmfAdzbm1QKJDACwr8Slqhq1jGzcrFMHunvXhVqjOs24R+JAHblY0O9PXvv7aR29GOQWDCvD7nV5QBUr8Xz5q7ozbLqHTI+yH02Jj4EaZ+azWYdRmnr9wDBxWMYBEgOdj4pII9b298XEDB72TxA3KpLTpdLxBTR+gIk/LjJqb/wb84xUv8gPXkaXccltGd5YI90c84cX8isbzNkAylzyfF2Eyueh0XbnMHfpFqBS7qaVM0/D+UxZkU0WNJ0x7G9XJvkiq49bZz2q1KLE4XuvVnTZSSjVSUAS8RtHfwUV33c=
slack:
- secure: V5eyoGShxFoCcYJcp858vf/T6gC9KeMxL0C1EElcpZRcKBrIVZzvhek3HLHxZOxlghqnvNVsyDtU3u5orkEaAXeXj5c2dN+4XBsAB9oeN5MtQ0Z3VLAhZDqKIW1LzcXrq4DpzM0PkGhjfjum/P94/qFYk0UckPtB6a341AuYRo8=
+ secure: E7XVlbdwIdKxnr6Tk1rmCefufs1w8h4nCWz79Uh6wMma8gC7x5ChKFqwvLRJ0WUpmPS+Ng1xeTv+wmb8TMDv2X8snmht9420/TFRy9wi1aLWNJXQUveNBzn83sCS40jFi6gd9xqKawd68R84UVH3PeNhksDtDnKAblx71miwbKmLwHc1KFoLMEnaaWEg5NgFl8/UadYDvsLD44v6YDza8eYrLp3aGK8v9ewBDySHE16IHAfpteTRaU0kG/H1kvVvFdH/h/sSPfimehd51b4i3mm/nRrjJ/VSLc7p9w5FkHUECtA0N6zcytRxN6MrbhrxJ8XG3vte3KSRSFCqfgOSRM2NWcca4CtBP2V0SwrAYMo5jim6fr921lfcbUTWTSnvMYLC17QrAxoclVrgK05GjGoLgSH42UPGf3QNkqXzyueNzaLJ+KSlgwFblIQKp6WGZYSRorL0F7s50pIoqMVoebcrnB0ObK/CcE2ywS/HeTgoSkWSDSmKBsO+cmtv1yAamy9DlmgRGZlxIxdBELXtHRkQ2B6Z2QdiQU4MHiFBc/IESJbnCait4odn+oJUjehZg+b9vjCoWwVw3zNMIJhokyxO8SiyKJmbO0z1g2L/BykWGI1DQu8HkeQzO+CmNUV3AOrxDG3amL/tkB/06fyQtnYMDhUhvX64uWSaE36sYL4=
before_deploy:
- mkdir /tmp/artifacts
diff --git a/README.md b/README.md
index 800bfddc21c0f..8b572bf54111f 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# Presto [](https://travis-ci.org/prestodb/presto)
+# Presto [](https://travis-ci.org/twitter-forks/presto)
Presto is a distributed SQL query engine for big data.
diff --git a/pom.xml b/pom.xml
index ee1734c961485..2256c0cab6337 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
com.facebook.presto
presto-root
- 0.203
+ 0.203-tw-0.52
pom
presto-root
@@ -28,10 +28,10 @@
- scm:git:git://github.com/facebook/presto.git
- https://github.com/facebook/presto
- 0.203
-
+ scm:git:git://github.com/twitter-forks/presto.git
+ https://github.com/twitter-forks/presto
+ 0.203-tw-0.52
+
${project.basedir}
@@ -117,6 +117,7 @@
presto-thrift-testing-server
presto-thrift-connector
presto-matching
+ presto-twitter-functions
presto-memory-context
@@ -806,6 +807,12 @@
3.6.1
+
+ org.apache.commons
+ commons-pool2
+ 2.4.2
+
+
commons-codec
commons-codec
@@ -900,6 +907,54 @@
1.1.1.7
+
+ org.apache.curator
+ curator-recipes
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-framework
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-client
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-test
+ 2.12.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
org.apache.zookeeper
zookeeper
@@ -923,7 +978,7 @@
com.101tec
zkclient
- 0.8
+ 0.10
log4j
@@ -933,6 +988,10 @@
org.slf4j
slf4j-log4j12
+
+ org.apache.zookeeper
+ zookeeper
+
@@ -1235,6 +1294,17 @@
+
+ twitter-modules
+
+ true
+
+
+ presto-kafka07
+ twitter-eventlistener-plugin
+ presto-twitter-server
+
+
tests-with-dependencies
io.airlift
@@ -205,6 +272,18 @@
+
+ org.apache.curator
+ curator-test
+ test
+
+
+
+ com.101tec
+ zkclient
+ test
+
+
com.facebook.presto
presto-main
@@ -253,12 +332,6 @@
test
-
- org.anarres.lzo
- lzo-hadoop
- test
-
-
com.facebook.presto
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
index 0dc5c97d74479..6a6475a07e1f5 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
@@ -99,6 +99,7 @@ public class HiveClientConfig
private boolean useParquetColumnNames;
private boolean parquetOptimizedReaderEnabled;
private boolean parquetPredicatePushdownEnabled;
+ private boolean parquetNestedFieldsProjectionPushdownEnabled;
private boolean assumeCanonicalPartitionKeys;
@@ -675,6 +676,18 @@ public HiveClientConfig setParquetPredicatePushdownEnabled(boolean parquetPredic
return this;
}
+ public boolean isParquetNestedFieldsProjectionPushdownEnabled()
+ {
+ return parquetNestedFieldsProjectionPushdownEnabled;
+ }
+
+ @Config("hive.parquet-nested-fields-projection-pushdown.enabled")
+ public HiveClientConfig setParquetNestedFieldsProjectionPushdownEnabled(boolean parquetNestedFieldsProjectionPushdownEnabled)
+ {
+ this.parquetNestedFieldsProjectionPushdownEnabled = parquetNestedFieldsProjectionPushdownEnabled;
+ return this;
+ }
+
@Deprecated
public boolean isParquetOptimizedReaderEnabled()
{
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
index b0bb04cb410a3..605c131c8c5fb 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
@@ -16,6 +16,7 @@
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.orc.DwrfPageSourceFactory;
import com.facebook.presto.hive.orc.OrcPageSourceFactory;
+import com.facebook.presto.hive.parquet.ParquetMetadataStats;
import com.facebook.presto.hive.parquet.ParquetPageSourceFactory;
import com.facebook.presto.hive.parquet.ParquetRecordCursorProvider;
import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory;
@@ -23,6 +24,9 @@
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.twitter.hive.thrift.HiveThriftFieldIdResolverFactory;
+import com.facebook.presto.twitter.hive.thrift.ThriftFieldIdResolverFactory;
+import com.facebook.presto.twitter.hive.thrift.ThriftHiveRecordCursorProvider;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
@@ -72,9 +76,11 @@ public void configure(Binder binder)
binder.bind(NamenodeStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(NamenodeStats.class).as(generatedNameOf(NamenodeStats.class, connectorId));
+ binder.bind(ThriftFieldIdResolverFactory.class).toInstance(new HiveThriftFieldIdResolverFactory());
Multibinder recordCursorProviderBinder = newSetBinder(binder, HiveRecordCursorProvider.class);
recordCursorProviderBinder.addBinding().to(ParquetRecordCursorProvider.class).in(Scopes.SINGLETON);
+ recordCursorProviderBinder.addBinding().to(ThriftHiveRecordCursorProvider.class).in(Scopes.SINGLETON);
recordCursorProviderBinder.addBinding().to(GenericHiveRecordCursorProvider.class).in(Scopes.SINGLETON);
binder.bind(HiveWriterStats.class).in(Scopes.SINGLETON);
@@ -97,6 +103,9 @@ public void configure(Binder binder)
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(FileFormatDataSourceStats.class).as(generatedNameOf(FileFormatDataSourceStats.class, connectorId));
+ binder.bind(ParquetMetadataStats.class).in(Scopes.SINGLETON);
+ newExporter(binder).export(ParquetMetadataStats.class).as(generatedNameOf(ParquetMetadataStats.class, connectorId));
+
Multibinder pageSourceFactoryBinder = newSetBinder(binder, HivePageSourceFactory.class);
pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(DwrfPageSourceFactory.class).in(Scopes.SINGLETON);
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java
index 0b153e7f71814..e9f3d0ac2896e 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java
@@ -15,6 +15,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.predicate.FieldSet;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -56,15 +57,22 @@ public enum ColumnType
}
private final String name;
+ private final Optional
diff --git a/presto-jmx/pom.xml b/presto-jmx/pom.xml
index 989c08a26de91..9f3f0a345160f 100644
--- a/presto-jmx/pom.xml
+++ b/presto-jmx/pom.xml
@@ -4,7 +4,7 @@
com.facebook.presto
presto-root
- 0.203
+ 0.203-tw-0.52
presto-jmx
diff --git a/presto-kafka/pom.xml b/presto-kafka/pom.xml
index edc53df7a8e8c..141470182616f 100644
--- a/presto-kafka/pom.xml
+++ b/presto-kafka/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.203
+ 0.203-tw-0.52
presto-kafka
diff --git a/presto-kafka07/pom.xml b/presto-kafka07/pom.xml
new file mode 100644
index 0000000000000..b00f45288803d
--- /dev/null
+++ b/presto-kafka07/pom.xml
@@ -0,0 +1,244 @@
+
+
+ 4.0.0
+
+
+ com.facebook.presto
+ presto-root
+ 0.203-tw-0.52
+
+
+ presto-kafka07
+ Presto - Kafka Connector for ver0.7
+ presto-plugin
+
+
+ ${project.parent.basedir}
+
+
+ true
+
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ configuration
+
+
+
+ com.facebook.presto
+ presto-record-decoder
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+
+
+
+ javax.validation
+ validation-api
+
+
+
+ com.twitter
+ rosette-kafka_2.11
+ 0.7.2-21
+
+
+ jsr305
+ com.google.code.findbugs
+
+
+ zookeeper
+ org.apache.zookeeper
+
+
+ finagle-ostrich4_2.10
+ com.twitter
+
+
+ commons-lang
+ commons-lang
+
+
+
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+ joda-time
+ joda-time
+
+
+
+ org.scala-lang
+ scala-library
+ 2.11.7
+
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+ javax.inject
+ javax.inject
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+
+ com.facebook.presto
+ presto-spi
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.airlift
+ units
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+
+ io.airlift
+ log-manager
+ runtime
+
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ com.facebook.presto
+ presto-main
+ test
+
+
+
+ com.facebook.presto
+ presto-tpch
+ test
+
+
+
+ com.facebook.presto
+ presto-client
+ test
+
+
+
+ com.facebook.presto
+ presto-tests
+ test
+
+
+
+ io.airlift.tpch
+ tpch
+ test
+
+
+
+ com.github.sgroschupf
+ zkclient
+ 0.1
+
+
+ log4j
+ log4j
+
+
+
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+ **/TestKafkaDistributed.java
+
+
+
+
+
+
+
+
+ ci
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+
+
+
+
+
+
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaColumnHandle.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaColumnHandle.java
new file mode 100644
index 0000000000000..b8ec023b24014
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaColumnHandle.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.decoder.DecoderColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Kafka specific connector column handle.
+ */
+public final class KafkaColumnHandle
+ implements DecoderColumnHandle, Comparable
+{
+ private final String connectorId;
+ private final int ordinalPosition;
+
+ /**
+ * Column Name
+ */
+ private final String name;
+
+ /**
+ * Column type
+ */
+ private final Type type;
+
+ /**
+ * Mapping hint for the decoder. Can be null.
+ */
+ private final String mapping;
+
+ /**
+ * Data format to use (selects the decoder). Can be null.
+ */
+ private final String dataFormat;
+
+ /**
+ * Additional format hint for the selected decoder. Selects a decoder subtype (e.g. which timestamp decoder).
+ */
+ private final String formatHint;
+
+ /**
+ * True if the key decoder should be used, false if the message decoder should be used.
+ */
+ private final boolean keyDecoder;
+
+ /**
+ * True if the column should be hidden.
+ */
+ private final boolean hidden;
+
+ /**
+ * True if the column is internal to the connector and not defined by a topic definition.
+ */
+ private final boolean internal;
+
+ @JsonCreator
+ public KafkaColumnHandle(
+ @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("ordinalPosition") int ordinalPosition,
+ @JsonProperty("name") String name,
+ @JsonProperty("type") Type type,
+ @JsonProperty("mapping") String mapping,
+ @JsonProperty("dataFormat") String dataFormat,
+ @JsonProperty("formatHint") String formatHint,
+ @JsonProperty("keyDecoder") boolean keyDecoder,
+ @JsonProperty("hidden") boolean hidden,
+ @JsonProperty("internal") boolean internal)
+
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.ordinalPosition = ordinalPosition;
+ this.name = requireNonNull(name, "name is null");
+ this.type = requireNonNull(type, "type is null");
+ this.mapping = mapping;
+ this.dataFormat = dataFormat;
+ this.formatHint = formatHint;
+ this.keyDecoder = keyDecoder;
+ this.hidden = hidden;
+ this.internal = internal;
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public int getOrdinalPosition()
+ {
+ return ordinalPosition;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ @JsonProperty
+ public Type getType()
+ {
+ return type;
+ }
+
+ @Override
+ @JsonProperty
+ public String getMapping()
+ {
+ return mapping;
+ }
+
+ @Override
+ @JsonProperty
+ public String getDataFormat()
+ {
+ return dataFormat;
+ }
+
+ @Override
+ @JsonProperty
+ public String getFormatHint()
+ {
+ return formatHint;
+ }
+
+ @JsonProperty
+ public boolean isKeyDecoder()
+ {
+ return keyDecoder;
+ }
+
+ @JsonProperty
+ public boolean isHidden()
+ {
+ return hidden;
+ }
+
+ @Override
+ @JsonProperty
+ public boolean isInternal()
+ {
+ return internal;
+ }
+
+ ColumnMetadata getColumnMetadata()
+ {
+ return new ColumnMetadata(name, type, null, hidden);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(connectorId, ordinalPosition, name, type, mapping, dataFormat, formatHint, keyDecoder, hidden, internal);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ KafkaColumnHandle other = (KafkaColumnHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) &&
+ Objects.equals(this.ordinalPosition, other.ordinalPosition) &&
+ Objects.equals(this.name, other.name) &&
+ Objects.equals(this.type, other.type) &&
+ Objects.equals(this.mapping, other.mapping) &&
+ Objects.equals(this.dataFormat, other.dataFormat) &&
+ Objects.equals(this.formatHint, other.formatHint) &&
+ Objects.equals(this.keyDecoder, other.keyDecoder) &&
+ Objects.equals(this.hidden, other.hidden) &&
+ Objects.equals(this.internal, other.internal);
+ }
+
+ @Override
+ public int compareTo(KafkaColumnHandle otherHandle)
+ {
+ return Integer.compare(this.getOrdinalPosition(), otherHandle.getOrdinalPosition());
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("connectorId", connectorId)
+ .add("ordinalPosition", ordinalPosition)
+ .add("name", name)
+ .add("type", type)
+ .add("mapping", mapping)
+ .add("dataFormat", dataFormat)
+ .add("formatHint", formatHint)
+ .add("keyDecoder", keyDecoder)
+ .add("hidden", hidden)
+ .add("internal", internal)
+ .toString();
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnector.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnector.java
new file mode 100644
index 0000000000000..69cfe62e87379
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Kafka specific implementation of the Presto Connector SPI. This is a read only connector.
+ */
+public class KafkaConnector
+ implements Connector
+{
+ private static final Logger log = Logger.get(KafkaConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final KafkaMetadata metadata;
+ private final KafkaSplitManager splitManager;
+ private final KafkaRecordSetProvider recordSetProvider;
+
+ @Inject
+ public KafkaConnector(
+ LifeCycleManager lifeCycleManager,
+ KafkaMetadata metadata,
+ KafkaSplitManager splitManager,
+ KafkaRecordSetProvider recordSetProvider)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
+ {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return KafkaTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorRecordSetProvider getRecordSetProvider()
+ {
+ return recordSetProvider;
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java
new file mode 100644
index 0000000000000..de5615de21676
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.HostAddress;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.configuration.Config;
+import io.airlift.units.DataSize;
+import io.airlift.units.DataSize.Unit;
+import io.airlift.units.Duration;
+import io.airlift.units.MinDuration;
+
+import javax.validation.constraints.NotNull;
+
+import java.io.File;
+import java.util.Set;
+
+import static com.google.common.collect.Iterables.transform;
+
+public class KafkaConnectorConfig
+{
+ private static final int KAFKA_DEFAULT_PORT = 9092;
+
+ /**
+ * Seed nodes for Kafka cluster. At least one must exist.
+ */
+ private Set nodes = ImmutableSet.of();
+
+ /**
+ * Timeout to connect to Kafka.
+ */
+ private Duration kafkaConnectTimeout = Duration.valueOf("10s");
+
+ /**
+ * Buffer size for connecting to Kafka.
+ */
+ private DataSize kafkaBufferSize = new DataSize(64, Unit.KILOBYTE);
+
+ /**
+ * The schema name to use in the connector.
+ */
+ private String defaultSchema = "default";
+
+ /**
+ * Set of tables known to this connector. For each table, a description file may be present in the catalog folder which describes columns for the given topic.
+ */
+ private Set tableNames = ImmutableSet.of();
+
+ /**
+ * Folder holding the JSON description files for Kafka topics.
+ */
+ private File tableDescriptionDir = new File("etc/kafka07/");
+
+ /**
+ * Whether internal columns are shown in table metadata or not. Default is no.
+ */
+ private boolean hideInternalColumns = true;
+
+ /**
+ * ZK endpoint for getting broker list
+ */
+ private String zkEndpoint = "";
+
+ /**
+ * Fetch size
+ */
+ private int fetchSize = 10 * 1024 * 1024;
+
+ @NotNull
+ public File getTableDescriptionDir()
+ {
+ return tableDescriptionDir;
+ }
+
+ @Config("kafka.table-description-dir")
+ public KafkaConnectorConfig setTableDescriptionDir(File tableDescriptionDir)
+ {
+ this.tableDescriptionDir = tableDescriptionDir;
+ return this;
+ }
+
+ @NotNull
+ public Set getTableNames()
+ {
+ return tableNames;
+ }
+
+ @Config("kafka.table-names")
+ public KafkaConnectorConfig setTableNames(String tableNames)
+ {
+ this.tableNames = ImmutableSet.copyOf(Splitter.on(',').omitEmptyStrings().trimResults().split(tableNames));
+ return this;
+ }
+
+ @NotNull
+ public String getDefaultSchema()
+ {
+ return defaultSchema;
+ }
+
+ @Config("kafka.default-schema")
+ public KafkaConnectorConfig setDefaultSchema(String defaultSchema)
+ {
+ this.defaultSchema = defaultSchema;
+ return this;
+ }
+
+ public Set getNodes()
+ {
+ return nodes;
+ }
+
+ @Config("kafka.nodes")
+ public KafkaConnectorConfig setNodes(String nodes)
+ {
+ this.nodes = (nodes == null) ? null : parseNodes(nodes);
+ return this;
+ }
+
+ @MinDuration("1s")
+ public Duration getKafkaConnectTimeout()
+ {
+ return kafkaConnectTimeout;
+ }
+
+ @Config("kafka.connect-timeout")
+ public KafkaConnectorConfig setKafkaConnectTimeout(String kafkaConnectTimeout)
+ {
+ this.kafkaConnectTimeout = Duration.valueOf(kafkaConnectTimeout);
+ return this;
+ }
+
+ public DataSize getKafkaBufferSize()
+ {
+ return kafkaBufferSize;
+ }
+
+ @Config("kafka.buffer-size")
+ public KafkaConnectorConfig setKafkaBufferSize(String kafkaBufferSize)
+ {
+ this.kafkaBufferSize = DataSize.valueOf(kafkaBufferSize);
+ return this;
+ }
+
+ public boolean isHideInternalColumns()
+ {
+ return hideInternalColumns;
+ }
+
+ @Config("kafka.hide-internal-columns")
+ public KafkaConnectorConfig setHideInternalColumns(boolean hideInternalColumns)
+ {
+ this.hideInternalColumns = hideInternalColumns;
+ return this;
+ }
+
+ @NotNull
+ public String getZkEndpoint()
+ {
+ return zkEndpoint;
+ }
+
+ @Config("kafka.zk-endpoint")
+ public KafkaConnectorConfig setZkEndpoint(String zkEndpoint)
+ {
+ this.zkEndpoint = zkEndpoint;
+ return this;
+ }
+
+ public int getFetchSize()
+ {
+ return fetchSize;
+ }
+
+ @Config("kafka.fetch-size")
+ public KafkaConnectorConfig setFetchSize(int fetchSize)
+ {
+ this.fetchSize = fetchSize;
+ return this;
+ }
+
+ public static ImmutableSet parseNodes(String nodes)
+ {
+ Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
+ return ImmutableSet.copyOf(transform(splitter.split(nodes), KafkaConnectorConfig::toHostAddress));
+ }
+
+ private static HostAddress toHostAddress(String value)
+ {
+ return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT);
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java
new file mode 100644
index 0000000000000..b9721f0459f83
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Creates Kafka Connectors based off connectorId and specific configuration.
+ */
+public class KafkaConnectorFactory
+ implements ConnectorFactory
+{
+ private final Optional>> tableDescriptionSupplier;
+
+ KafkaConnectorFactory(Optional>> tableDescriptionSupplier)
+ {
+ this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return "kafka07";
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver()
+ {
+ return new KafkaHandleResolver();
+ }
+
+ @Override
+ public Connector create(String connectorId, Map config, ConnectorContext context)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ requireNonNull(config, "config is null");
+
+ try {
+ Bootstrap app = new Bootstrap(
+ new JsonModule(),
+ new KafkaConnectorModule(),
+ binder -> {
+ binder.bind(KafkaConnectorId.class).toInstance(new KafkaConnectorId(connectorId));
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+
+ if (tableDescriptionSupplier.isPresent()) {
+ binder.bind(new TypeLiteral>>() {}).toInstance(tableDescriptionSupplier.get());
+ }
+ else {
+ binder.bind(new TypeLiteral>>() {}).to(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON);
+ }
+ });
+
+ Injector injector = app.strictConfig()
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
+
+ return injector.getInstance(KafkaConnector.class);
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorId.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorId.java
new file mode 100644
index 0000000000000..3470980df0735
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class KafkaConnectorId
+{
+ private final String connectorId;
+
+ public KafkaConnectorId(String connectorId)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ KafkaConnectorId other = (KafkaConnectorId) obj;
+ return Objects.equals(this.connectorId, other.connectorId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(connectorId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return connectorId;
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
new file mode 100644
index 0000000000000..e12e70567dc06
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.decoder.DecoderModule;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+import com.google.inject.multibindings.Multibinder;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.airlift.json.JsonBinder.jsonBinder;
+import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Guice module for the Apache Kafka connector.
+ */
+public class KafkaConnectorModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);
+
+ binder.bind(KafkaMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
+
+ binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
+
+ configBinder(binder).bindConfig(KafkaConnectorConfig.class);
+
+ jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
+ jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class);
+
+ binder.install(new DecoderModule());
+
+ for (KafkaInternalFieldDescription internalFieldDescription : KafkaInternalFieldDescription.getInternalFields()) {
+ bindInternalColumn(binder, internalFieldDescription);
+ }
+ }
+
+ private static void bindInternalColumn(Binder binder, KafkaInternalFieldDescription fieldDescription)
+ {
+ Multibinder fieldDescriptionBinder = Multibinder.newSetBinder(binder, KafkaInternalFieldDescription.class);
+ fieldDescriptionBinder.addBinding().toInstance(fieldDescription);
+ }
+
+ public static final class TypeDeserializer
+ extends FromStringDeserializer
+ {
+ private static final long serialVersionUID = 1L;
+
+ private final TypeManager typeManager;
+
+ @Inject
+ public TypeDeserializer(TypeManager typeManager)
+ {
+ super(Type.class);
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ protected Type _deserialize(String value, DeserializationContext context)
+ {
+ Type type = typeManager.getType(parseTypeSignature(value));
+ checkArgument(type != null, "Unknown type %s", value);
+ return type;
+ }
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java
new file mode 100644
index 0000000000000..9338b98492116
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.ErrorCode;
+import com.facebook.presto.spi.ErrorCodeSupplier;
+import com.facebook.presto.spi.ErrorType;
+
+import static com.facebook.presto.spi.ErrorType.EXTERNAL;
+
+/**
+ * Kafka connector specific error codes.
+ */
+public enum KafkaErrorCode
+ implements ErrorCodeSupplier
+{
+ KAFKA_SPLIT_ERROR(0, EXTERNAL);
+
+ private final ErrorCode errorCode;
+
+ KafkaErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0102_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaHandleResolver.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaHandleResolver.java
new file mode 100644
index 0000000000000..539b082701298
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaHandleResolver.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Kafka specific {@link com.facebook.presto.spi.ConnectorHandleResolver} implementation.
+ */
+public class KafkaHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return KafkaTableHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return KafkaColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return KafkaSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
+ {
+ return KafkaTableLayoutHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return KafkaTransactionHandle.class;
+ }
+
+ static KafkaTableHandle convertTableHandle(ConnectorTableHandle tableHandle)
+ {
+ requireNonNull(tableHandle, "tableHandle is null");
+ checkArgument(tableHandle instanceof KafkaTableHandle, "tableHandle is not an instance of KafkaTableHandle");
+ return (KafkaTableHandle) tableHandle;
+ }
+
+ static KafkaColumnHandle convertColumnHandle(ColumnHandle columnHandle)
+ {
+ requireNonNull(columnHandle, "columnHandle is null");
+ checkArgument(columnHandle instanceof KafkaColumnHandle, "columnHandle is not an instance of KafkaColumnHandle");
+ return (KafkaColumnHandle) columnHandle;
+ }
+
+ static KafkaSplit convertSplit(ConnectorSplit split)
+ {
+ requireNonNull(split, "split is null");
+ checkArgument(split instanceof KafkaSplit, "split is not an instance of KafkaSplit");
+ return (KafkaSplit) split;
+ }
+
+ static KafkaTableLayoutHandle convertLayout(ConnectorTableLayoutHandle layout)
+ {
+ requireNonNull(layout, "layout is null");
+ checkArgument(layout instanceof KafkaTableLayoutHandle, "layout is not an instance of KafkaTableLayoutHandle");
+ return (KafkaTableLayoutHandle) layout;
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java
new file mode 100644
index 0000000000000..5bcad3d565d4d
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.decoder.DecoderColumnHandle;
+import com.facebook.presto.decoder.FieldValueProvider;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.BigintType;
+import com.facebook.presto.spi.type.BooleanType;
+import com.facebook.presto.spi.type.Type;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static com.facebook.presto.spi.type.VarcharType.createUnboundedVarcharType;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Describes an internal (managed by the connector) field which is added to each table row. The definition itself makes the row
+ * show up in the tables (the columns are hidden by default, so they must be explicitly selected) but unless the field is hooked in using the
+ * forBooleanValue/forLongValue/forBytesValue methods and the resulting FieldValueProvider is then passed into the appropriate row decoder, the fields
+ * will be null. Most values are assigned in the {@link com.facebook.presto.kafka.KafkaRecordSet}.
+ */
+public class KafkaInternalFieldDescription
+{
+ /**
+ * _partition_id - Kafka partition id.
+ */
+ public static final KafkaInternalFieldDescription PARTITION_ID_FIELD = new KafkaInternalFieldDescription("_partition_id", BigintType.BIGINT, "Partition Id");
+
+ /**
+ * _partition_offset - The current offset of the message in the partition.
+ */
+ public static final KafkaInternalFieldDescription PARTITION_OFFSET_FIELD = new KafkaInternalFieldDescription("_partition_offset", BigintType.BIGINT, "Offset for the message within the partition");
+
+ /**
+ * _segment_start - Kafka start offset for the segment which contains the current message. This is per-partition.
+ */
+ public static final KafkaInternalFieldDescription SEGMENT_START_FIELD = new KafkaInternalFieldDescription("_segment_start", BigintType.BIGINT, "Segment start offset");
+
+ /**
+ * _segment_end - Kafka end offset for the segment which contains the current message. This is per-partition. The end offset is the first offset that is *not* in the segment.
+ */
+ public static final KafkaInternalFieldDescription SEGMENT_END_FIELD = new KafkaInternalFieldDescription("_segment_end", BigintType.BIGINT, "Segment end offset");
+
+ /**
+ * _segment_count - Running count of messages in a segment.
+ */
+ public static final KafkaInternalFieldDescription SEGMENT_COUNT_FIELD = new KafkaInternalFieldDescription("_segment_count", BigintType.BIGINT, "Running message count per segment");
+
+ /**
+ * _message_corrupt - True if the row converter could not read the a message. May be null if the row converter does not set a value (e.g. the dummy row converter does not).
+ */
+ public static final KafkaInternalFieldDescription MESSAGE_CORRUPT_FIELD = new KafkaInternalFieldDescription("_message_corrupt", BooleanType.BOOLEAN, "Message data is corrupt");
+
+ /**
+ * _message - Represents the full topic as a text column. Format is UTF-8 which may be wrong for some topics. TODO: make charset configurable.
+ */
+ public static final KafkaInternalFieldDescription MESSAGE_FIELD = new KafkaInternalFieldDescription("_message", createUnboundedVarcharType(), "Message text");
+
+ /**
+ * _message_length - length in bytes of the message.
+ */
+ public static final KafkaInternalFieldDescription MESSAGE_LENGTH_FIELD = new KafkaInternalFieldDescription("_message_length", BigintType.BIGINT, "Total number of message bytes");
+
+ /**
+ * _key_corrupt - True if the row converter could not read the a key. May be null if the row converter does not set a value (e.g. the dummy row converter does not).
+ */
+ public static final KafkaInternalFieldDescription KEY_CORRUPT_FIELD = new KafkaInternalFieldDescription("_key_corrupt", BooleanType.BOOLEAN, "Key data is corrupt");
+
+ /**
+ * _key - Represents the key as a text column. Format is UTF-8 which may be wrong for topics. TODO: make charset configurable.
+ */
+ public static final KafkaInternalFieldDescription KEY_FIELD = new KafkaInternalFieldDescription("_key", createUnboundedVarcharType(), "Key text");
+
+ /**
+ * _key_length - length in bytes of the key.
+ */
+ public static final KafkaInternalFieldDescription KEY_LENGTH_FIELD = new KafkaInternalFieldDescription("_key_length", BigintType.BIGINT, "Total number of key bytes");
+
+ /**
+ * _timestamp - offset timestamp, used to narrow scan range
+ */
+ public static final KafkaInternalFieldDescription OFFSET_TIMESTAMP_FIELD = new KafkaInternalFieldDescription("_timestamp", BigintType.BIGINT, "Offset Timestamp");
+
+ public static Set getInternalFields()
+ {
+ return ImmutableSet.of(PARTITION_ID_FIELD, PARTITION_OFFSET_FIELD,
+ SEGMENT_START_FIELD, SEGMENT_END_FIELD, SEGMENT_COUNT_FIELD,
+ KEY_FIELD, KEY_CORRUPT_FIELD, KEY_LENGTH_FIELD,
+ MESSAGE_FIELD, MESSAGE_CORRUPT_FIELD, MESSAGE_LENGTH_FIELD, OFFSET_TIMESTAMP_FIELD);
+ }
+
+ private final String name;
+ private final Type type;
+ private final String comment;
+
+ KafkaInternalFieldDescription(
+ String name,
+ Type type,
+ String comment)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or is empty");
+ this.name = name;
+ this.type = requireNonNull(type, "type is null");
+ this.comment = requireNonNull(comment, "comment is null");
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public Type getType()
+ {
+ return type;
+ }
+
+ KafkaColumnHandle getColumnHandle(String connectorId, int index, boolean hidden)
+ {
+ return new KafkaColumnHandle(connectorId,
+ index,
+ getName(),
+ getType(),
+ null,
+ null,
+ null,
+ false,
+ hidden,
+ true);
+ }
+
+ ColumnMetadata getColumnMetadata(boolean hidden)
+ {
+ return new ColumnMetadata(name, type, comment, hidden);
+ }
+
+ public FieldValueProvider forBooleanValue(boolean value)
+ {
+ return new BooleanKafkaFieldValueProvider(value);
+ }
+
+ public FieldValueProvider forLongValue(long value)
+ {
+ return new LongKafkaFieldValueProvider(value);
+ }
+
+ public FieldValueProvider forByteValue(byte[] value)
+ {
+ return new BytesKafkaFieldValueProvider(value);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, type);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ KafkaInternalFieldDescription other = (KafkaInternalFieldDescription) obj;
+ return Objects.equals(this.name, other.name) &&
+ Objects.equals(this.type, other.type);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("name", name)
+ .add("type", type)
+ .toString();
+ }
+
+ public class BooleanKafkaFieldValueProvider
+ extends FieldValueProvider
+ {
+ private final boolean value;
+
+ private BooleanKafkaFieldValueProvider(boolean value)
+ {
+ this.value = value;
+ }
+
+ @Override
+ public boolean accept(DecoderColumnHandle columnHandle)
+ {
+ return columnHandle.getName().equals(name);
+ }
+
+ @Override
+ public boolean getBoolean()
+ {
+ return value;
+ }
+
+ @Override
+ public boolean isNull()
+ {
+ return false;
+ }
+ }
+
+ public class LongKafkaFieldValueProvider
+ extends FieldValueProvider
+ {
+ private final long value;
+
+ private LongKafkaFieldValueProvider(long value)
+ {
+ this.value = value;
+ }
+
+ @Override
+ public boolean accept(DecoderColumnHandle columnHandle)
+ {
+ return columnHandle.getName().equals(name);
+ }
+
+ @Override
+ public long getLong()
+ {
+ return value;
+ }
+
+ @Override
+ public boolean isNull()
+ {
+ return false;
+ }
+ }
+
+ public class BytesKafkaFieldValueProvider
+ extends FieldValueProvider
+ {
+ private final byte[] value;
+
+ private BytesKafkaFieldValueProvider(byte[] value)
+ {
+ this.value = value;
+ }
+
+ @Override
+ public boolean accept(DecoderColumnHandle columnHandle)
+ {
+ return columnHandle.getName().equals(name);
+ }
+
+ @Override
+ public Slice getSlice()
+ {
+ return isNull() ? Slices.EMPTY_SLICE : Slices.wrappedBuffer(value);
+ }
+
+ @Override
+ public boolean isNull()
+ {
+ return value == null || value.length == 0;
+ }
+ }
+}
diff --git a/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java
new file mode 100644
index 0000000000000..e9644a4c14711
--- /dev/null
+++ b/presto-kafka07/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.presto.decoder.dummy.DummyRowDecoder;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.TableNotFoundException;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Marker;
+import com.facebook.presto.spi.predicate.Range;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.log.Logger;
+
+import javax.inject.Inject;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static com.facebook.presto.kafka.KafkaHandleResolver.convertColumnHandle;
+import static com.facebook.presto.kafka.KafkaHandleResolver.convertTableHandle;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Manages the Kafka connector specific metadata information. The Connector provides an additional set of columns
+ * for each table that are created as hidden columns. See {@link KafkaInternalFieldDescription} for a list
+ * of per-topic additional columns.
+ */
+public class KafkaMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger log = Logger.get(KafkaMetadata.class);
+
+ private final String connectorId;
+ private final boolean hideInternalColumns;
+ private final Map tableDescriptions;
+ private final Set internalFieldDescriptions;
+
+ @Inject
+ public KafkaMetadata(
+ KafkaConnectorId connectorId,
+ KafkaConnectorConfig kafkaConnectorConfig,
+ Supplier