Skip to content

Map MySQL JSON to Presto JSON #16647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
while (resultSet.next()) {
JdbcTypeHandle typeHandle = new JdbcTypeHandle(
resultSet.getInt("DATA_TYPE"),
resultSet.getString("TYPE_NAME"),
Optional.ofNullable(resultSet.getString("TYPE_NAME")),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"));
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static com.facebook.presto.common.type.Decimals.readBigDecimal;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.JsonType.JSON;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
Expand Down Expand Up @@ -169,6 +170,9 @@ else if (UuidType.UUID.equals(type)) {
Slice slice = type.getSlice(block, position);
statement.setObject(parameter, prestoUuidToJavaUuid(slice));
}
else if (JSON.equals(type)) {
statement.setString(parameter, type.getSlice(block, position).toStringUtf8());
}
else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public final class JdbcTypeHandle
{
private final int jdbcType;
private final String jdbcTypeName;
private final Optional<String> jdbcTypeName;
private final int columnSize;
private final int decimalDigits;

@JsonCreator
public JdbcTypeHandle(
@JsonProperty("jdbcType") int jdbcType,
@JsonProperty("jdbcTypeName") String jdbcTypeName,
@JsonProperty("jdbcTypeName") Optional<String> jdbcTypeName,
@JsonProperty("columnSize") int columnSize,
@JsonProperty("decimalDigits") int decimalDigits)
{
Expand All @@ -48,7 +49,7 @@ public int getJdbcType()
}

@JsonProperty
public String getJdbcTypeName()
public Optional<String> getJdbcTypeName()
{
return jdbcTypeName;
}
Expand Down Expand Up @@ -92,7 +93,7 @@ public String toString()
{
return toStringHelper(this)
.add("jdbcType", jdbcType)
.add("jdbcTypeName", jdbcTypeName)
.add("jdbcTypeName", jdbcTypeName.orElse(null))
.add("columnSize", columnSize)
.add("decimalDigits", decimalDigits)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@
package com.facebook.presto.plugin.jdbc;

import java.sql.Types;
import java.util.Optional;

public final class TestingJdbcTypeHandle
{
private TestingJdbcTypeHandle() {}

public static final JdbcTypeHandle JDBC_BOOLEAN = new JdbcTypeHandle(Types.BOOLEAN, "boolean", 1, 0);
public static final JdbcTypeHandle JDBC_BOOLEAN = new JdbcTypeHandle(Types.BOOLEAN, Optional.of("boolean"), 1, 0);

public static final JdbcTypeHandle JDBC_SMALLINT = new JdbcTypeHandle(Types.SMALLINT, "smallint", 1, 0);
public static final JdbcTypeHandle JDBC_TINYINT = new JdbcTypeHandle(Types.TINYINT, "tinyint", 2, 0);
public static final JdbcTypeHandle JDBC_INTEGER = new JdbcTypeHandle(Types.INTEGER, "integer", 4, 0);
public static final JdbcTypeHandle JDBC_BIGINT = new JdbcTypeHandle(Types.BIGINT, "bigint", 8, 0);
public static final JdbcTypeHandle JDBC_SMALLINT = new JdbcTypeHandle(Types.SMALLINT, Optional.of("smallint"), 1, 0);
public static final JdbcTypeHandle JDBC_TINYINT = new JdbcTypeHandle(Types.TINYINT, Optional.of("tinyint"), 2, 0);
public static final JdbcTypeHandle JDBC_INTEGER = new JdbcTypeHandle(Types.INTEGER, Optional.of("integer"), 4, 0);
public static final JdbcTypeHandle JDBC_BIGINT = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), 8, 0);

public static final JdbcTypeHandle JDBC_REAL = new JdbcTypeHandle(Types.REAL, "real", 8, 0);
public static final JdbcTypeHandle JDBC_DOUBLE = new JdbcTypeHandle(Types.DOUBLE, "double precision", 8, 0);
public static final JdbcTypeHandle JDBC_REAL = new JdbcTypeHandle(Types.REAL, Optional.of("real"), 8, 0);
public static final JdbcTypeHandle JDBC_DOUBLE = new JdbcTypeHandle(Types.DOUBLE, Optional.of("double precision"), 8, 0);

public static final JdbcTypeHandle JDBC_CHAR = new JdbcTypeHandle(Types.CHAR, "char", 10, 0);
public static final JdbcTypeHandle JDBC_VARCHAR = new JdbcTypeHandle(Types.VARCHAR, "varchar", 10, 0);
public static final JdbcTypeHandle JDBC_CHAR = new JdbcTypeHandle(Types.CHAR, Optional.of("char"), 10, 0);
public static final JdbcTypeHandle JDBC_VARCHAR = new JdbcTypeHandle(Types.VARCHAR, Optional.of("varchar"), 10, 0);

public static final JdbcTypeHandle JDBC_DATE = new JdbcTypeHandle(Types.DATE, "date", 8, 0);
public static final JdbcTypeHandle JDBC_TIME = new JdbcTypeHandle(Types.TIME, "time", 4, 0);
public static final JdbcTypeHandle JDBC_TIMESTAMP = new JdbcTypeHandle(Types.TIMESTAMP, "timestamp", 8, 0);
public static final JdbcTypeHandle JDBC_DATE = new JdbcTypeHandle(Types.DATE, Optional.of("date"), 8, 0);
public static final JdbcTypeHandle JDBC_TIME = new JdbcTypeHandle(Types.TIME, Optional.of("time"), 4, 0);
public static final JdbcTypeHandle JDBC_TIMESTAMP = new JdbcTypeHandle(Types.TIMESTAMP, Optional.of("timestamp"), 8, 0);
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ private static VariableReferenceExpression newVariable(String name, Type type)

private static JdbcColumnHandle integerJdbcColumnHandle(String name)
{
return new JdbcColumnHandle(CONNECTOR_ID, name, new JdbcTypeHandle(Types.BIGINT, "integer", 10, 0), BIGINT, false, Optional.empty());
return new JdbcColumnHandle(CONNECTOR_ID, name, new JdbcTypeHandle(Types.BIGINT, Optional.of("integer"), 10, 0), BIGINT, false, Optional.empty());
}

private static JdbcColumnHandle booleanJdbcColumnHandle(String name)
{
return new JdbcColumnHandle(CONNECTOR_ID, name, new JdbcTypeHandle(Types.BOOLEAN, "boolean", 1, 0), BOOLEAN, false, Optional.empty());
return new JdbcColumnHandle(CONNECTOR_ID, name, new JdbcTypeHandle(Types.BOOLEAN, Optional.of("boolean"), 1, 0), BOOLEAN, false, Optional.empty());
}

private static JdbcColumnHandle getColumnHandleForVariable(String name, Type type)
Expand Down
26 changes: 20 additions & 6 deletions presto-mysql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,26 @@
<artifactId>javax.inject</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-parser</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand Down Expand Up @@ -120,12 +140,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>json</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
*/
package com.facebook.presto.plugin.mysql;

import com.facebook.airlift.json.JsonObjectMapperProvider;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.plugin.jdbc.BaseJdbcClient;
import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
Expand All @@ -23,16 +27,27 @@
import com.facebook.presto.plugin.jdbc.JdbcConnectorId;
import com.facebook.presto.plugin.jdbc.JdbcIdentity;
import com.facebook.presto.plugin.jdbc.JdbcTableHandle;
import com.facebook.presto.plugin.jdbc.JdbcTypeHandle;
import com.facebook.presto.plugin.jdbc.ReadMapping;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.mysql.jdbc.Driver;
import com.mysql.jdbc.Statement;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;

import javax.inject.Inject;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand All @@ -51,21 +66,29 @@
import static com.facebook.presto.plugin.jdbc.DriverConnectionFactory.basicConnectionProperties;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.fasterxml.jackson.core.JsonFactory.Feature.CANONICALIZE_FIELD_NAMES;
import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.mysql.jdbc.SQLError.SQL_STATE_ER_TABLE_EXISTS_ERROR;
import static com.mysql.jdbc.SQLError.SQL_STATE_SYNTAX_ERROR;
import static io.airlift.slice.Slices.utf8Slice;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;

public class MySqlClient
extends BaseJdbcClient
{
private final Type jsonType;

@Inject
public MySqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config, MySqlConfig mySqlConfig)
public MySqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config, MySqlConfig mySqlConfig, TypeManager typeManager)
throws SQLException
{
super(connectorId, config, "`", connectionFactory(config, mySqlConfig));
this.jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON));
}

private static ConnectionFactory connectionFactory(BaseJdbcConfig config, MySqlConfig mySqlConfig)
Expand Down Expand Up @@ -155,6 +178,18 @@ protected String getTableSchemaName(ResultSet resultSet)
return resultSet.getString("TABLE_CAT");
}

@Override
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
String jdbcTypeName = typeHandle.getJdbcTypeName()
.orElseThrow(() -> new PrestoException(JDBC_ERROR, "Type name is missing: " + typeHandle));

if (jdbcTypeName.equalsIgnoreCase("json")) {
return Optional.of(jsonColumnMapping());
}
return super.toPrestoType(session, typeHandle);
}

@Override
protected String toSqlType(Type type)
{
Expand Down Expand Up @@ -186,6 +221,9 @@ protected String toSqlType(Type type)
}
return "longtext";
}
if (type.getTypeSignature().getBase().equals(StandardTypes.JSON)) {
return "json";
}

return super.toSqlType(type);
}
Expand Down Expand Up @@ -235,4 +273,39 @@ protected void renameTable(JdbcIdentity identity, String catalogName, SchemaTabl
// catalogName parameter to null it will be omitted in the alter table statement.
super.renameTable(identity, null, oldTable, newTable);
}

private ReadMapping jsonColumnMapping()
{
return ReadMapping.sliceReadMapping(jsonType,
(resultSet, columnIndex) -> jsonParse(utf8Slice(resultSet.getString(columnIndex))));
}

private static final JsonFactory JSON_FACTORY = new JsonFactory()
.disable(CANONICALIZE_FIELD_NAMES);

private static final ObjectMapper SORTED_MAPPER = new JsonObjectMapperProvider().get().configure(ORDER_MAP_ENTRIES_BY_KEYS, true);

private static Slice jsonParse(Slice slice)
{
try (JsonParser parser = createJsonParser(slice)) {
byte[] in = slice.getBytes();
SliceOutput dynamicSliceOutput = new DynamicSliceOutput(in.length);
SORTED_MAPPER.writeValue((OutputStream) dynamicSliceOutput, SORTED_MAPPER.readValue(parser, Object.class));
// nextToken() returns null if the input is parsed correctly,
// but will throw an exception if there are trailing characters.
parser.nextToken();
return dynamicSliceOutput.slice();
}
catch (Exception e) {
throw new PrestoException(INVALID_FUNCTION_ARGUMENT, format("Cannot convert '%s' to JSON", slice.toStringUtf8()));
}
}

private static JsonParser createJsonParser(Slice json)
throws IOException
{
// Jackson tries to detect the character encoding automatically when using InputStream
// so we pass an InputStreamReader instead.
return JSON_FACTORY.createParser(new InputStreamReader(json.getInput(), UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.tests.datatype.CreateAndInsertDataSetup;
import com.facebook.presto.tests.datatype.CreateAsSelectDataSetup;
import com.facebook.presto.tests.datatype.DataSetup;
import com.facebook.presto.tests.datatype.DataType;
import com.facebook.presto.tests.datatype.DataTypeTest;
import com.facebook.presto.tests.sql.JdbcSqlExecutor;
import com.facebook.presto.tests.sql.PrestoSqlExecutor;
Expand All @@ -36,13 +37,17 @@
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.function.Function;

import static com.facebook.presto.common.type.JsonType.JSON;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.common.type.VarcharType.createVarcharType;
import static com.facebook.presto.plugin.mysql.MySqlQueryRunner.createMySqlQueryRunner;
import static com.facebook.presto.sql.ExpressionFormatter.formatStringLiteral;
import static com.facebook.presto.tests.datatype.DataType.bigintDataType;
import static com.facebook.presto.tests.datatype.DataType.charDataType;
import static com.facebook.presto.tests.datatype.DataType.dataType;
import static com.facebook.presto.tests.datatype.DataType.dateDataType;
import static com.facebook.presto.tests.datatype.DataType.decimalDataType;
import static com.facebook.presto.tests.datatype.DataType.doubleDataType;
Expand All @@ -57,6 +62,7 @@
import static com.google.common.base.Verify.verify;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.function.Function.identity;

@Test
public class TestMySqlTypeMapping
Expand Down Expand Up @@ -294,4 +300,37 @@ private DataSetup mysqlCreateAndInsert(String tableNamePrefix)
JdbcSqlExecutor mysqlUnicodeExecutor = new JdbcSqlExecutor(mysqlServer.getJdbcUrl() + "&useUnicode=true&characterEncoding=utf8");
return new CreateAndInsertDataSetup(mysqlUnicodeExecutor, tableNamePrefix);
}

@Test
public void testJson()
{
jsonTestCases(jsonDataType(value -> "JSON " + formatStringLiteral(value)))
.execute(getQueryRunner(), prestoCreateAsSelect("presto_test_json"));
jsonTestCases(jsonDataType(value -> format("CAST(%s AS JSON)", formatStringLiteral(value))))
.execute(getQueryRunner(), mysqlCreateAndInsert("tpch.mysql_test_json"));
}

private DataTypeTest jsonTestCases(DataType<String> jsonDataType)
{
return DataTypeTest.create()
.addRoundTrip(jsonDataType, "{}")
.addRoundTrip(jsonDataType, null)
.addRoundTrip(jsonDataType, "null")
.addRoundTrip(jsonDataType, "123.4")
.addRoundTrip(jsonDataType, "\"abc\"")
.addRoundTrip(jsonDataType, "\"text with ' apostrophes\"")
.addRoundTrip(jsonDataType, "\"\"")
.addRoundTrip(jsonDataType, "{\"a\":1,\"b\":2}")
.addRoundTrip(jsonDataType, "{\"a\":[1,2,3],\"b\":{\"aa\":11,\"bb\":[{\"a\":1,\"b\":2},{\"a\":0}]}}")
.addRoundTrip(jsonDataType, "[]");
}

private static DataType<String> jsonDataType(Function<String, String> toLiteral)
{
return dataType(
"json",
JSON,
toLiteral,
identity());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ private String joinExpressions(List<Expression> expressions)
}
}

static String formatStringLiteral(String s)
public static String formatStringLiteral(String s)
{
s = s.replace("'", "''");
if (CharMatcher.inRange((char) 0x20, (char) 0x7E).matchesAllOf(s)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ protected String toSqlType(Type type)
@Override
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
if (typeHandle.getJdbcTypeName().equals("jsonb") || typeHandle.getJdbcTypeName().equals("json")) {
String jdbcTypeName = typeHandle.getJdbcTypeName()
.orElseThrow(() -> new PrestoException(JDBC_ERROR, "Type name is missing: " + typeHandle));
if (jdbcTypeName.equals("jsonb") || jdbcTypeName.equals("json")) {
return Optional.of(jsonColumnMapping());
}

Expand Down