Skip to content

Commit dd25f24

Browse files
committed
Add VARIANT_BINARY client capability for JDBC
1 parent 13fa6b2 commit dd25f24

File tree

26 files changed

+1902
-88
lines changed

26 files changed

+1902
-88
lines changed

client/trino-client/src/main/java/io/trino/client/ClientCapabilities.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
*/
1414
package io.trino.client;
1515

16+
import java.util.Set;
17+
18+
import static java.util.Arrays.stream;
19+
import static java.util.stream.Collectors.toUnmodifiableSet;
20+
1621
public enum ClientCapabilities
1722
{
1823
PATH,
@@ -39,8 +44,39 @@ public enum ClientCapabilities
3944
*/
4045
VARIANT_JSON,
4146

47+
/**
48+
* Whether client supports the `VARIANT` type encoded as a binary payload on the wire.
49+
* This capability is opt-in, so clients continue to receive the JSON representation by default.
50+
*/
51+
VARIANT_BINARY(false),
52+
4253
/**
4354
* Whether clients support the session authorization set/reset feature
4455
*/
4556
SESSION_AUTHORIZATION;
57+
58+
private final boolean enabledByDefault;
59+
60+
ClientCapabilities()
61+
{
62+
this(true);
63+
}
64+
65+
ClientCapabilities(boolean enabledByDefault)
66+
{
67+
this.enabledByDefault = enabledByDefault;
68+
}
69+
70+
public boolean enabledByDefault()
71+
{
72+
return enabledByDefault;
73+
}
74+
75+
public static Set<String> defaultClientCapabilities()
76+
{
77+
return stream(values())
78+
.filter(ClientCapabilities::enabledByDefault)
79+
.map(Enum::name)
80+
.collect(toUnmodifiableSet());
81+
}
4682
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.client;
15+
16+
import static com.google.common.base.Preconditions.checkArgument;
17+
import static java.util.Objects.requireNonNull;
18+
19+
public final class EncodedVariant
20+
{
21+
private final byte[] metadataBytes;
22+
private final byte[] valueBytes;
23+
24+
private EncodedVariant(byte[] metadataBytes, byte[] valueBytes)
25+
{
26+
this.metadataBytes = requireNonNull(metadataBytes, "metadataBytes is null");
27+
this.valueBytes = requireNonNull(valueBytes, "valueBytes is null");
28+
checkArgument(valueBytes.length > 0, "valueBytes is empty");
29+
}
30+
31+
public static EncodedVariant fromBytes(byte[] metadataBytes, byte[] valueBytes)
32+
{
33+
requireNonNull(metadataBytes, "metadataBytes is null");
34+
requireNonNull(valueBytes, "valueBytes is null");
35+
return new EncodedVariant(metadataBytes.clone(), valueBytes.clone());
36+
}
37+
38+
public byte[] getMetadataBytes()
39+
{
40+
return metadataBytes.clone();
41+
}
42+
43+
public byte[] getValueBytes()
44+
{
45+
return valueBytes.clone();
46+
}
47+
}

client/trino-client/src/main/java/io/trino/client/JsonDecodingUtils.java

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Optional;
3030

3131
import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
32+
import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
33+
import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
3234
import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
3335
import static com.google.common.base.Preconditions.checkArgument;
3436
import static com.google.common.base.Verify.verify;
@@ -87,16 +89,17 @@ private JsonDecodingUtils() {}
8789
private static final RealDecoder REAL_DECODER = new RealDecoder();
8890
private static final BooleanDecoder BOOLEAN_DECODER = new BooleanDecoder();
8991
private static final StringDecoder STRING_DECODER = new StringDecoder();
90-
private static final VariantDecoder VARIANT_DECODER = new VariantDecoder();
92+
private static final VariantJsonDecoder VARIANT_JSON_DECODER = new VariantJsonDecoder();
93+
private static final VariantBinaryDecoder VARIANT_BINARY_DECODER = new VariantBinaryDecoder();
9194
private static final Base64Decoder BASE_64_DECODER = new Base64Decoder();
9295
private static final ObjectDecoder OBJECT_DECODER = new ObjectDecoder();
9396

94-
public static TypeDecoder[] createTypeDecoders(List<Column> columns)
97+
public static TypeDecoder[] createTypeDecoders(List<Column> columns, boolean supportsVariantBinary)
9598
{
9699
verify(!columns.isEmpty(), "Columns must not be empty");
97100
TypeDecoder[] decoders = new TypeDecoder[columns.size()];
98101
for (int i = 0; i < columns.size(); i++) {
99-
decoders[i] = createTypeDecoder(columns.get(i).getTypeSignature());
102+
decoders[i] = createTypeDecoder(columns.get(i).getTypeSignature(), supportsVariantBinary);
100103
}
101104
return decoders;
102105
}
@@ -107,7 +110,7 @@ Object decode(JsonParser parser)
107110
throws IOException;
108111
}
109112

110-
private static TypeDecoder createTypeDecoder(ClientTypeSignature signature)
113+
private static TypeDecoder createTypeDecoder(ClientTypeSignature signature, boolean supportsVariantBinary)
111114
{
112115
switch (signature.getRawType()) {
113116
case BIGINT:
@@ -125,13 +128,13 @@ private static TypeDecoder createTypeDecoder(ClientTypeSignature signature)
125128
case BOOLEAN:
126129
return BOOLEAN_DECODER;
127130
case VARIANT:
128-
return VARIANT_DECODER;
131+
return supportsVariantBinary ? VARIANT_BINARY_DECODER : VARIANT_JSON_DECODER;
129132
case ARRAY:
130-
return new ArrayDecoder(signature);
133+
return new ArrayDecoder(signature, supportsVariantBinary);
131134
case MAP:
132-
return new MapDecoder(signature);
135+
return new MapDecoder(signature, supportsVariantBinary);
133136
case ROW:
134-
return new RowDecoder(signature);
137+
return new RowDecoder(signature, supportsVariantBinary);
135138
case VARCHAR:
136139
case JSON:
137140
case TIME:
@@ -296,7 +299,7 @@ public Object decode(JsonParser parser)
296299
}
297300
}
298301

299-
private static class VariantDecoder
302+
private static class VariantJsonDecoder
300303
implements TypeDecoder
301304
{
302305
@Override
@@ -311,6 +314,46 @@ public Object decode(JsonParser parser)
311314
}
312315
}
313316

317+
private static class VariantBinaryDecoder
318+
implements TypeDecoder
319+
{
320+
@Override
321+
public Object decode(JsonParser parser)
322+
throws IOException
323+
{
324+
if (requireNonNull(parser.currentToken()) != START_OBJECT) {
325+
throw illegalToken(parser);
326+
}
327+
328+
byte[] metadataBytes = null;
329+
byte[] valueBytes = null;
330+
while (parser.nextToken() != END_OBJECT) {
331+
if (requireNonNull(parser.currentToken()) != FIELD_NAME) {
332+
throw illegalToken(parser);
333+
}
334+
335+
String fieldName = parser.currentName();
336+
if (parser.nextToken() != JsonToken.VALUE_STRING) {
337+
throw illegalToken(parser);
338+
}
339+
340+
switch (fieldName) {
341+
case "metadata":
342+
metadataBytes = Base64.getDecoder().decode(parser.getValueAsString());
343+
break;
344+
case "value":
345+
valueBytes = Base64.getDecoder().decode(parser.getValueAsString());
346+
break;
347+
}
348+
}
349+
350+
if (metadataBytes == null || valueBytes == null) {
351+
throw illegalToken(parser);
352+
}
353+
return EncodedVariant.fromBytes(metadataBytes, valueBytes);
354+
}
355+
}
356+
314357
private static class Base64Decoder
315358
implements TypeDecoder
316359
{
@@ -327,11 +370,11 @@ private static class ArrayDecoder
327370
{
328371
private final TypeDecoder typeDecoder;
329372

330-
public ArrayDecoder(ClientTypeSignature signature)
373+
public ArrayDecoder(ClientTypeSignature signature, boolean supportsVariantBinary)
331374
{
332375
requireNonNull(signature, "signature is null");
333376
checkArgument(signature.getRawType().equals(ARRAY), "not an array type signature: %s", signature);
334-
this.typeDecoder = createTypeDecoder(signature.getArgumentsAsTypeSignatures().get(0));
377+
this.typeDecoder = createTypeDecoder(signature.getArgumentsAsTypeSignatures().get(0), supportsVariantBinary);
335378
}
336379

337380
@Override
@@ -363,12 +406,12 @@ private static class MapDecoder
363406
private final String keyType;
364407
private final TypeDecoder valueDecoder;
365408

366-
public MapDecoder(ClientTypeSignature signature)
409+
public MapDecoder(ClientTypeSignature signature, boolean supportsVariantBinary)
367410
{
368411
requireNonNull(signature, "signature is null");
369412
checkArgument(signature.getRawType().equals(MAP), "not a map type signature: %s", signature);
370413
this.keyType = signature.getArgumentsAsTypeSignatures().get(0).getRawType();
371-
this.valueDecoder = createTypeDecoder(signature.getArgumentsAsTypeSignatures().get(1));
414+
this.valueDecoder = createTypeDecoder(signature.getArgumentsAsTypeSignatures().get(1), supportsVariantBinary);
372415
}
373416

374417
@Override
@@ -446,7 +489,7 @@ private static class RowDecoder
446489
private final TypeDecoder[] fieldDecoders;
447490
private final List<Optional<String>> fieldNames;
448491

449-
private RowDecoder(ClientTypeSignature signature)
492+
private RowDecoder(ClientTypeSignature signature, boolean supportsVariantBinary)
450493
{
451494
requireNonNull(signature, "signature is null");
452495
checkArgument(signature.getRawType().equals(ROW), "not a row type signature: %s", signature);
@@ -455,7 +498,7 @@ private RowDecoder(ClientTypeSignature signature)
455498

456499
int index = 0;
457500
for (ClientTypeSignatureParameter parameter : signature.getArguments()) {
458-
fieldDecoders[index] = createTypeDecoder(parameter.getTypeSignature());
501+
fieldDecoders[index] = createTypeDecoder(parameter.getTypeSignature(), supportsVariantBinary);
459502
fieldNames.add(parameter.getName());
460503
index++;
461504
}

client/trino-client/src/main/java/io/trino/client/JsonIterators.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ public void close()
138138
}
139139
}
140140

141-
public static CloseableIterator<List<Object>> forJsonParser(JsonParser parser, List<Column> columns)
141+
public static CloseableIterator<List<Object>> forJsonParser(JsonParser parser, List<Column> columns, boolean supportsVariantBinary)
142142
throws IOException
143143
{
144-
return new JsonIterator(parser, createTypeDecoders(columns));
144+
return new JsonIterator(parser, createTypeDecoders(columns, supportsVariantBinary));
145145
}
146146

147147
public static CloseableIterator<List<Object>> forInputStream(InputStream stream, TypeDecoder[] decoders)

client/trino-client/src/main/java/io/trino/client/QueryDataDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public interface QueryDataDecoder
2323
{
2424
interface Factory
2525
{
26-
QueryDataDecoder create(List<Column> columns, DataAttributes attributes);
26+
QueryDataDecoder create(List<Column> columns, DataAttributes attributes, boolean supportsVariantBinary);
2727

2828
String encoding();
2929
}

client/trino-client/src/main/java/io/trino/client/ResultRowsDecoder.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.client;
1515

16+
import com.google.common.annotations.VisibleForTesting;
1617
import io.trino.client.spooling.DataAttributes;
1718
import io.trino.client.spooling.EncodedQueryData;
1819
import io.trino.client.spooling.SegmentLoader;
@@ -38,16 +39,19 @@ public class ResultRowsDecoder
3839
implements AutoCloseable
3940
{
4041
private final SegmentLoader loader;
42+
private final boolean supportsVariantBinary;
4143
private QueryDataDecoder decoder;
4244

45+
@VisibleForTesting
4346
public ResultRowsDecoder()
4447
{
45-
this(new OkHttpSegmentLoader());
48+
this(new OkHttpSegmentLoader(), false);
4649
}
4750

48-
public ResultRowsDecoder(SegmentLoader loader)
51+
ResultRowsDecoder(SegmentLoader loader, boolean supportsVariantBinary)
4952
{
5053
this.loader = requireNonNull(loader, "loader is null");
54+
this.supportsVariantBinary = supportsVariantBinary;
5155
}
5256

5357
private void setEncoding(List<Column> columns, String encoding)
@@ -59,7 +63,7 @@ private void setEncoding(List<Column> columns, String encoding)
5963
checkState(!columns.isEmpty(), "Columns must be set when decoding data");
6064
this.decoder = QueryDataDecoders.get(encoding)
6165
// we don't use query-level attributes for now
62-
.create(columns, DataAttributes.empty());
66+
.create(columns, DataAttributes.empty(), supportsVariantBinary);
6367
}
6468
}
6569

@@ -88,7 +92,7 @@ public ResultRows toRows(List<Column> columns, QueryData data)
8892
if (data instanceof JsonQueryData) {
8993
JsonQueryData jsonData = (JsonQueryData) data;
9094
try {
91-
return wrapIterator(JsonIterators.forJsonParser(jsonData.getJsonParser(), columns), jsonData.getRowsCount());
95+
return wrapIterator(JsonIterators.forJsonParser(jsonData.getJsonParser(), columns, supportsVariantBinary), jsonData.getRowsCount());
9296
}
9397
catch (IOException e) {
9498
throw new UncheckedIOException(e);

client/trino-client/src/main/java/io/trino/client/StatementClientFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public static StatementClient newStatementClient(Call.Factory httpCallFactory, C
2828
return new StatementClientV1(httpCallFactory, segmentHttpCallFactory, session, query, Optional.empty());
2929
}
3030

31+
public static StatementClient newStatementClient(Call.Factory httpCallFactory, Call.Factory segmentHttpCallFactory, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
32+
{
33+
return new StatementClientV1(httpCallFactory, segmentHttpCallFactory, session, query, clientCapabilities);
34+
}
35+
3136
public static StatementClient newStatementClient(OkHttpClient httpClient, Call.Factory segmentHttpCallFactory, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
3237
{
3338
return new StatementClientV1((Call.Factory) httpClient, segmentHttpCallFactory, session, query, clientCapabilities);

client/trino-client/src/main/java/io/trino/client/StatementClientV1.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
7070
import static java.net.HttpURLConnection.HTTP_OK;
7171
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
72-
import static java.util.Arrays.stream;
7372
import static java.util.Objects.requireNonNull;
7473
import static java.util.concurrent.TimeUnit.MILLISECONDS;
7574

@@ -132,13 +131,14 @@ public StatementClientV1(Call.Factory httpCallFactory, Call.Factory segmentHttpC
132131
.map(Optional::get)
133132
.findFirst();
134133
this.setOriginalRoles.addAll(session.getOriginalRoles());
135-
this.clientCapabilities = Joiner.on(",").join(clientCapabilities.orElseGet(() -> stream(ClientCapabilities.values())
136-
.map(Enum::name)
137-
.collect(toImmutableSet())));
134+
Set<String> effectiveClientCapabilities = clientCapabilities.orElseGet(ClientCapabilities::defaultClientCapabilities);
135+
this.clientCapabilities = Joiner.on(",").join(effectiveClientCapabilities);
138136
this.compressionDisabled = session.isCompressionDisabled();
139137
this.heartbeatInterval = session.getHeartbeatInterval().toMillis() * 1_000_000;
140138

141-
this.resultRowsDecoder = new ResultRowsDecoder(new OkHttpSegmentLoader(requireNonNull(segmentHttpCallFactory, "segmentHttpCallFactory is null")));
139+
this.resultRowsDecoder = new ResultRowsDecoder(
140+
new OkHttpSegmentLoader(requireNonNull(segmentHttpCallFactory, "segmentHttpCallFactory is null")),
141+
effectiveClientCapabilities.contains(ClientCapabilities.VARIANT_BINARY.toString()));
142142

143143
Request request = buildQueryRequest(session, query, session.getEncoding());
144144
// Pass empty as materializedJsonSizeLimit to always materialize the first response

0 commit comments

Comments
 (0)