Skip to content

Commit f02ca1b

Browse files
committed
Implement native ESRI reader
1 parent bc016bf commit f02ca1b

File tree

17 files changed

+6629
-5
lines changed

17 files changed

+6629
-5
lines changed

Diff for: lib/trino-hive-formats/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
</properties>
1818

1919
<dependencies>
20+
<dependency>
21+
<groupId>com.esri.geometry</groupId>
22+
<artifactId>esri-geometry-api</artifactId>
23+
</dependency>
24+
2025
<dependency>
2126
<groupId>com.fasterxml.jackson.core</groupId>
2227
<artifactId>jackson-core</artifactId>

Diff for: lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public final class HiveClassNames
4545
public static final String SEQUENCEFILE_INPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.SequenceFileInputFormat";
4646
public static final String SYMLINK_TEXT_INPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat";
4747
public static final String TEXT_INPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.TextInputFormat";
48+
public static final String ESRI_SERDE_CLASS = "com.esri.hadoop.hive.serde.EsriJsonSerDe";
49+
public static final String ESRI_INPUT_FORMAT_CLASS = "com.esri.json.hadoop.EnclosedEsriJsonInputFormat";
4850

4951
private HiveClassNames() {}
5052
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,371 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.hive.formats.esri;
15+
16+
import com.esri.core.geometry.Geometry;
17+
import com.esri.core.geometry.GeometryEngine;
18+
import com.esri.core.geometry.MapGeometry;
19+
import com.esri.core.geometry.ogc.OGCGeometry;
20+
import com.fasterxml.jackson.core.JsonFactory;
21+
import com.fasterxml.jackson.core.JsonParser;
22+
import com.fasterxml.jackson.core.JsonToken;
23+
import io.airlift.slice.Slices;
24+
import io.trino.hive.formats.line.Column;
25+
import io.trino.plugin.base.type.DecodedTimestamp;
26+
import io.trino.spi.PageBuilder;
27+
import io.trino.spi.TrinoException;
28+
import io.trino.spi.block.BlockBuilder;
29+
import io.trino.spi.type.CharType;
30+
import io.trino.spi.type.DecimalConversions;
31+
import io.trino.spi.type.DecimalType;
32+
import io.trino.spi.type.Int128;
33+
import io.trino.spi.type.TimestampType;
34+
import io.trino.spi.type.Type;
35+
import io.trino.spi.type.VarcharType;
36+
37+
import java.io.IOException;
38+
import java.math.BigDecimal;
39+
import java.nio.ByteBuffer;
40+
import java.sql.Date;
41+
import java.sql.Timestamp;
42+
import java.text.ParseException;
43+
import java.text.SimpleDateFormat;
44+
import java.time.ZoneId;
45+
import java.util.List;
46+
import java.util.TimeZone;
47+
48+
import static io.trino.plugin.base.type.TrinoTimestampEncoderFactory.createTimestampEncoder;
49+
import static io.trino.plugin.base.util.JsonUtils.jsonFactory;
50+
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
51+
import static io.trino.spi.type.BigintType.BIGINT;
52+
import static io.trino.spi.type.BooleanType.BOOLEAN;
53+
import static io.trino.spi.type.Chars.truncateToLengthAndTrimSpaces;
54+
import static io.trino.spi.type.DateType.DATE;
55+
import static io.trino.spi.type.Decimals.overflows;
56+
import static io.trino.spi.type.DoubleType.DOUBLE;
57+
import static io.trino.spi.type.IntegerType.INTEGER;
58+
import static io.trino.spi.type.RealType.REAL;
59+
import static io.trino.spi.type.SmallintType.SMALLINT;
60+
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
61+
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
62+
import static io.trino.spi.type.TinyintType.TINYINT;
63+
import static io.trino.spi.type.VarbinaryType.VARBINARY;
64+
import static io.trino.spi.type.Varchars.truncateToLength;
65+
import static java.lang.Float.floatToRawIntBits;
66+
import static java.lang.StrictMath.floorDiv;
67+
import static java.lang.StrictMath.floorMod;
68+
import static java.lang.StrictMath.toIntExact;
69+
import static java.lang.String.format;
70+
import static java.math.RoundingMode.HALF_UP;
71+
import static java.util.Locale.ENGLISH;
72+
import static org.joda.time.DateTimeZone.UTC;
73+
74+
public final class EsriDeserializer
75+
{
76+
private static final TimeZone tz = TimeZone.getTimeZone(ZoneId.of("UTC"));
77+
private static final JsonFactory jsonFactory = jsonFactory();
78+
private static final String GEOMETRY_FIELD_NAME = "geometry";
79+
private static final String ATTRIBUTES_FIELD_NAME = "attributes";
80+
private static final String[] TIMESTAMP_FORMATS = new String[] {"yyyy-MM-dd HH:mm:ss.SSS", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm", "yyyy-MM-dd"};
81+
82+
private final int numColumns;
83+
private int geometryColumn;
84+
private final List<String> columnNames;
85+
private final List<Type> columnTypes;
86+
87+
public EsriDeserializer(List<Column> columns)
88+
{
89+
numColumns = columns.size();
90+
columnNames = columns.stream().map(Column::name).toList();
91+
columnTypes = columns.stream().map(Column::type).toList();
92+
93+
geometryColumn = -1;
94+
for (int i = 0; i < numColumns; i++) {
95+
if (columnTypes.get(i) == VARBINARY) {
96+
if (geometryColumn >= 0) {
97+
throw new IllegalArgumentException("Multiple binary columns defined. Define only one binary column for geometries");
98+
}
99+
geometryColumn = i;
100+
}
101+
}
102+
}
103+
104+
public void deserialize(PageBuilder pageBuilder, String jsonStr)
105+
throws IOException
106+
{
107+
if (jsonStr == null) {
108+
return;
109+
}
110+
111+
try (JsonParser parser = jsonFactory.createParser(jsonStr)) {
112+
for (JsonToken token = parser.nextToken(); token != null; token = parser.nextToken()) {
113+
if (token != JsonToken.START_OBJECT) {
114+
continue;
115+
}
116+
117+
if (GEOMETRY_FIELD_NAME.equals(parser.currentName())) {
118+
if (geometryColumn > -1) {
119+
OGCGeometry ogcGeom = parseGeom(parser);
120+
if (ogcGeom != null) {
121+
byte[] esriShapeBytes = geometryToEsriShape(ogcGeom);
122+
VARBINARY.writeSlice(pageBuilder.getBlockBuilder(geometryColumn), Slices.wrappedBuffer(esriShapeBytes));
123+
}
124+
}
125+
else {
126+
parser.skipChildren();
127+
}
128+
}
129+
else if (ATTRIBUTES_FIELD_NAME.equals(parser.currentName())) {
130+
for (JsonToken token2 = parser.nextToken(); token2 != JsonToken.END_OBJECT && token2 != null; token2 = parser.nextToken()) {
131+
String name = parser.getText().toLowerCase(ENGLISH);
132+
parser.nextToken();
133+
int fieldIndex = columnNames.indexOf(name);
134+
if (fieldIndex >= 0) {
135+
Type columnType = columnTypes.get(fieldIndex);
136+
String columnName = columnNames.get(fieldIndex);
137+
serializeValue(parser, columnType, columnName, pageBuilder.getBlockBuilder(fieldIndex), true);
138+
}
139+
}
140+
}
141+
}
142+
143+
pageBuilder.declarePosition();
144+
145+
for (int i = 0; i < numColumns; i++) {
146+
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(i);
147+
if (blockBuilder.getPositionCount() != pageBuilder.getPositionCount()) {
148+
blockBuilder.appendNull();
149+
}
150+
}
151+
}
152+
}
153+
154+
private OGCGeometry parseGeom(JsonParser parser)
155+
{
156+
MapGeometry mapGeom = GeometryEngine.jsonToGeometry(parser);
157+
return OGCGeometry.createFromEsriGeometry(mapGeom.getGeometry(), mapGeom.getSpatialReference());
158+
}
159+
160+
private byte[] geometryToEsriShape(OGCGeometry ogcGeometry)
161+
{
162+
int wkid;
163+
try {
164+
wkid = ogcGeometry.SRID();
165+
}
166+
catch (NullPointerException e) {
167+
wkid = 0;
168+
}
169+
170+
OGCType ogcType;
171+
try {
172+
String typeName = ogcGeometry.geometryType();
173+
ogcType = switch (typeName) {
174+
case "Point" -> OGCType.ST_POINT;
175+
case "LineString" -> OGCType.ST_LINESTRING;
176+
case "Polygon" -> OGCType.ST_POLYGON;
177+
case "MultiPoint" -> OGCType.ST_MULTIPOINT;
178+
case "MultiLineString" -> OGCType.ST_MULTILINESTRING;
179+
case "MultiPolygon" -> OGCType.ST_MULTIPOLYGON;
180+
default -> OGCType.UNKNOWN;
181+
};
182+
}
183+
catch (NullPointerException e) {
184+
ogcType = OGCType.UNKNOWN;
185+
}
186+
187+
return serializeGeometry(ogcGeometry.getEsriGeometry(), wkid, ogcType);
188+
}
189+
190+
private static byte[] serializeGeometry(Geometry geometry, int wkid, OGCType type)
191+
{
192+
if (geometry == null) {
193+
return null;
194+
}
195+
else {
196+
byte[] shape = GeometryEngine.geometryToEsriShape(geometry);
197+
if (shape == null) {
198+
return null;
199+
}
200+
else {
201+
byte[] shapeWithData = new byte[shape.length + 4 + 1];
202+
System.arraycopy(shape, 0, shapeWithData, 5, shape.length);
203+
204+
setWKID(shapeWithData, wkid);
205+
setType(shapeWithData, type);
206+
207+
return shapeWithData;
208+
}
209+
}
210+
}
211+
212+
private static void setWKID(byte[] geomref, int wkid)
213+
{
214+
ByteBuffer bb = ByteBuffer.allocate(4);
215+
bb.putInt(wkid);
216+
System.arraycopy(bb.array(), 0, geomref, 0, 4);
217+
}
218+
219+
private static void setType(byte[] geomref, OGCType type)
220+
{
221+
geomref[4] = (byte) type.getIndex();
222+
}
223+
224+
private void serializeValue(JsonParser parser, Type columnType, String columnName, BlockBuilder builder, boolean nullOnParseError)
225+
{
226+
try {
227+
if (BOOLEAN.equals(columnType)) {
228+
columnType.writeBoolean(builder, parser.getBooleanValue());
229+
}
230+
else if (BIGINT.equals(columnType)) {
231+
columnType.writeLong(builder, parser.getLongValue());
232+
}
233+
else if (INTEGER.equals(columnType)) {
234+
columnType.writeLong(builder, parser.getIntValue());
235+
}
236+
else if (SMALLINT.equals(columnType)) {
237+
columnType.writeLong(builder, parser.getShortValue());
238+
}
239+
else if (TINYINT.equals(columnType)) {
240+
columnType.writeLong(builder, parser.getByteValue());
241+
}
242+
else if (columnType instanceof DecimalType decimalType) {
243+
serializeDecimal(parser.getText(), decimalType, builder);
244+
}
245+
else if (REAL.equals(columnType)) {
246+
columnType.writeLong(builder, floatToRawIntBits(parser.getFloatValue()));
247+
}
248+
else if (DOUBLE.equals(columnType)) {
249+
columnType.writeDouble(builder, parser.getDoubleValue());
250+
}
251+
else if (DATE.equals(columnType)) {
252+
columnType.writeLong(builder, toIntExact(parseDate(parser).getTime() / 86400000L));
253+
}
254+
else if (columnType instanceof TimestampType timestampType) {
255+
Timestamp timestamp = parseTime(parser);
256+
DecodedTimestamp decodedTimestamp = createDecodedTimestamp(timestamp);
257+
258+
createTimestampEncoder(timestampType, UTC).write(decodedTimestamp, builder);
259+
}
260+
else if (columnType instanceof VarcharType varcharType) {
261+
if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
262+
builder.appendNull();
263+
}
264+
else {
265+
columnType.writeSlice(builder, truncateToLength(Slices.utf8Slice(parser.getText()), varcharType));
266+
}
267+
}
268+
else if (columnType instanceof CharType charType) {
269+
columnType.writeSlice(builder, truncateToLengthAndTrimSpaces(Slices.utf8Slice(parser.getText()), charType));
270+
}
271+
else {
272+
throw new RuntimeException("Column '" + columnName + "' with type: " + columnType.getDisplayName() + " is not supported");
273+
}
274+
}
275+
catch (Exception e) {
276+
// invalid columns are ignored
277+
if (nullOnParseError) {
278+
builder.appendNull();
279+
}
280+
else {
281+
throw new TrinoException(GENERIC_USER_ERROR, "Error Parsing a column in the table: " + e.getMessage(), e);
282+
}
283+
}
284+
}
285+
286+
private static DecodedTimestamp createDecodedTimestamp(Timestamp timestamp)
287+
{
288+
long millis = timestamp.getTime();
289+
long epochSeconds = floorDiv(millis, (long) MILLISECONDS_PER_SECOND);
290+
long fractionalSecond = floorMod(millis, (long) MILLISECONDS_PER_SECOND);
291+
int nanosOfSecond = toIntExact(fractionalSecond * (long) NANOSECONDS_PER_MILLISECOND);
292+
293+
return new DecodedTimestamp(epochSeconds, nanosOfSecond);
294+
}
295+
296+
private static void serializeDecimal(String value, DecimalType decimalType, BlockBuilder builder)
297+
{
298+
BigDecimal bigDecimal;
299+
try {
300+
bigDecimal = new BigDecimal(value).setScale(DecimalConversions.intScale(decimalType.getScale()), HALF_UP);
301+
}
302+
catch (NumberFormatException e) {
303+
throw new NumberFormatException(format("Cannot convert '%s' to %s. Value is not a number.", value, decimalType));
304+
}
305+
306+
if (overflows(bigDecimal, decimalType.getPrecision())) {
307+
throw new IllegalArgumentException(format("Cannot convert '%s' to %s. Value too large.", value, decimalType));
308+
}
309+
310+
if (decimalType.isShort()) {
311+
decimalType.writeLong(builder, bigDecimal.unscaledValue().longValueExact());
312+
}
313+
else {
314+
decimalType.writeObject(builder, Int128.valueOf(bigDecimal.unscaledValue()));
315+
}
316+
}
317+
318+
private Date parseDate(JsonParser parser)
319+
throws IOException
320+
{
321+
Date jsd = null;
322+
if (JsonToken.VALUE_NUMBER_INT.equals(parser.getCurrentToken())) {
323+
long epoch = parser.getLongValue();
324+
jsd = new Date(epoch - (long) tz.getOffset(epoch));
325+
}
326+
else {
327+
try {
328+
long epoch = parseTime(parser.getText(), "yyyy-MM-dd");
329+
jsd = new Date(epoch + 43200000L);
330+
}
331+
catch (ParseException e) {
332+
}
333+
}
334+
335+
return jsd;
336+
}
337+
338+
private Timestamp parseTime(JsonParser parser)
339+
throws IOException
340+
{
341+
Timestamp jst = null;
342+
if (JsonToken.VALUE_NUMBER_INT.equals(parser.getCurrentToken())) {
343+
long epoch = parser.getLongValue();
344+
jst = new Timestamp(epoch);
345+
}
346+
else {
347+
String value = parser.getText();
348+
int point = value.indexOf(46);
349+
String dateStr = point < 0 ? value : value.substring(0, point + 4);
350+
351+
for (String format : TIMESTAMP_FORMATS) {
352+
try {
353+
jst = new Timestamp(parseTime(dateStr, format));
354+
break;
355+
}
356+
catch (ParseException e) {
357+
}
358+
}
359+
}
360+
361+
return jst;
362+
}
363+
364+
private long parseTime(String value, String format)
365+
throws ParseException
366+
{
367+
SimpleDateFormat dtFmt = new SimpleDateFormat(format);
368+
dtFmt.setTimeZone(tz);
369+
return dtFmt.parse(value).getTime();
370+
}
371+
}

0 commit comments

Comments
 (0)