Skip to content

Commit 87d9d72

Browse files
committed
Implement native ESRI reader
1 parent d872e37 commit 87d9d72

File tree

17 files changed

+6723
-4
lines changed

17 files changed

+6723
-4
lines changed

Diff for: lib/trino-hive-formats/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
</properties>
1818

1919
<dependencies>
20+
<dependency>
21+
<groupId>com.esri.geometry</groupId>
22+
<artifactId>esri-geometry-api</artifactId>
23+
</dependency>
24+
2025
<dependency>
2126
<groupId>com.fasterxml.jackson.core</groupId>
2227
<artifactId>jackson-core</artifactId>

Diff for: lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public final class HiveClassNames
4545
public static final String SEQUENCEFILE_INPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.SequenceFileInputFormat";
4646
public static final String SYMLINK_TEXT_INPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat";
4747
public static final String TEXT_INPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.TextInputFormat";
48+
public static final String ESRI_SERDE_CLASS = "com.esri.hadoop.hive.serde.EsriJsonSerDe";
49+
public static final String ESRI_INPUT_FORMAT_CLASS = "com.esri.json.hadoop.EnclosedEsriJsonInputFormat";
4850

4951
private HiveClassNames() {}
5052
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,374 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.hive.formats.esri;
15+
16+
import com.esri.core.geometry.Geometry;
17+
import com.esri.core.geometry.GeometryEngine;
18+
import com.esri.core.geometry.MapGeometry;
19+
import com.esri.core.geometry.ogc.OGCGeometry;
20+
import com.fasterxml.jackson.core.JsonParser;
21+
import com.fasterxml.jackson.core.JsonToken;
22+
import io.airlift.slice.Slices;
23+
import io.trino.hive.formats.line.Column;
24+
import io.trino.plugin.base.type.DecodedTimestamp;
25+
import io.trino.spi.PageBuilder;
26+
import io.trino.spi.TrinoException;
27+
import io.trino.spi.block.BlockBuilder;
28+
import io.trino.spi.type.CharType;
29+
import io.trino.spi.type.DecimalConversions;
30+
import io.trino.spi.type.DecimalType;
31+
import io.trino.spi.type.Int128;
32+
import io.trino.spi.type.TimestampType;
33+
import io.trino.spi.type.Type;
34+
import io.trino.spi.type.VarcharType;
35+
36+
import java.io.IOException;
37+
import java.math.BigDecimal;
38+
import java.nio.ByteBuffer;
39+
import java.sql.Date;
40+
import java.sql.Timestamp;
41+
import java.time.Instant;
42+
import java.time.LocalDate;
43+
import java.time.LocalDateTime;
44+
import java.time.ZoneId;
45+
import java.time.format.DateTimeFormatter;
46+
import java.time.format.DateTimeParseException;
47+
import java.util.HashMap;
48+
import java.util.List;
49+
import java.util.Map;
50+
import java.util.TimeZone;
51+
52+
import static io.trino.plugin.base.type.TrinoTimestampEncoderFactory.createTimestampEncoder;
53+
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
54+
import static io.trino.spi.type.BigintType.BIGINT;
55+
import static io.trino.spi.type.BooleanType.BOOLEAN;
56+
import static io.trino.spi.type.Chars.truncateToLengthAndTrimSpaces;
57+
import static io.trino.spi.type.DateType.DATE;
58+
import static io.trino.spi.type.Decimals.overflows;
59+
import static io.trino.spi.type.DoubleType.DOUBLE;
60+
import static io.trino.spi.type.IntegerType.INTEGER;
61+
import static io.trino.spi.type.RealType.REAL;
62+
import static io.trino.spi.type.SmallintType.SMALLINT;
63+
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
64+
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
65+
import static io.trino.spi.type.TinyintType.TINYINT;
66+
import static io.trino.spi.type.VarbinaryType.VARBINARY;
67+
import static io.trino.spi.type.Varchars.truncateToLength;
68+
import static java.lang.Float.floatToRawIntBits;
69+
import static java.lang.StrictMath.floorDiv;
70+
import static java.lang.StrictMath.floorMod;
71+
import static java.lang.StrictMath.toIntExact;
72+
import static java.lang.String.format;
73+
import static java.math.RoundingMode.HALF_UP;
74+
import static java.util.Locale.ENGLISH;
75+
import static java.util.Objects.requireNonNull;
76+
import static org.joda.time.DateTimeZone.UTC;
77+
78+
public final class EsriDeserializer
79+
{
80+
private static final ZoneId UTC_ZONE = ZoneId.of("UTC");
81+
private static final TimeZone tz = TimeZone.getTimeZone(UTC_ZONE);
82+
private static final String GEOMETRY_FIELD_NAME = "geometry";
83+
private static final String ATTRIBUTES_FIELD_NAME = "attributes";
84+
private static final DateTimeFormatter DATE_FORMATTER =
85+
DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(UTC_ZONE);
86+
private static final List<DateTimeFormatter> TIMESTAMP_FORMATTERS = List.of(
87+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(UTC_ZONE),
88+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(UTC_ZONE),
89+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm").withZone(UTC_ZONE),
90+
DATE_FORMATTER);
91+
92+
private final int numColumns;
93+
private int geometryColumn;
94+
private final List<String> columnNames;
95+
private final Map<String, Integer> columnNameToIndex;
96+
private final List<Type> columnTypes;
97+
98+
public EsriDeserializer(List<Column> columns)
99+
{
100+
numColumns = columns.size();
101+
columnNames = columns.stream().map(Column::name).toList();
102+
columnTypes = columns.stream().map(Column::type).toList();
103+
104+
columnNameToIndex = new HashMap<>();
105+
for (int i = 0; i < columnNames.size(); i++) {
106+
columnNameToIndex.put(columnNames.get(i).toLowerCase(ENGLISH), i);
107+
}
108+
109+
geometryColumn = -1;
110+
for (int i = 0; i < numColumns; i++) {
111+
if (columnTypes.get(i) == VARBINARY) {
112+
if (geometryColumn >= 0) {
113+
throw new IllegalArgumentException("Multiple binary columns defined. Define only one binary column for geometries");
114+
}
115+
geometryColumn = i;
116+
}
117+
}
118+
}
119+
120+
public void deserialize(PageBuilder pageBuilder, JsonParser parser)
121+
throws IOException
122+
{
123+
for (JsonToken token = parser.nextToken(); token != null && token != JsonToken.END_OBJECT; token = parser.nextToken()) {
124+
if (token != JsonToken.START_OBJECT) {
125+
continue;
126+
}
127+
128+
if (GEOMETRY_FIELD_NAME.equals(parser.currentName())) {
129+
if (geometryColumn > -1) {
130+
OGCGeometry ogcGeom = parseGeom(parser);
131+
if (ogcGeom != null) {
132+
byte[] esriShapeBytes = geometryToEsriShape(ogcGeom);
133+
VARBINARY.writeSlice(pageBuilder.getBlockBuilder(geometryColumn), Slices.wrappedBuffer(esriShapeBytes));
134+
}
135+
}
136+
else {
137+
parser.skipChildren();
138+
}
139+
}
140+
else if (ATTRIBUTES_FIELD_NAME.equals(parser.currentName())) {
141+
for (JsonToken token2 = parser.nextToken(); token2 != null && token2 != JsonToken.END_OBJECT; token2 = parser.nextToken()) {
142+
String name = parser.getText().toLowerCase(ENGLISH);
143+
parser.nextToken();
144+
Integer fieldIndex = columnNameToIndex.get(name);
145+
if (fieldIndex != null) {
146+
Type columnType = columnTypes.get(fieldIndex);
147+
String columnName = columnNames.get(fieldIndex);
148+
serializeValue(parser, columnType, columnName, pageBuilder.getBlockBuilder(fieldIndex), true);
149+
}
150+
}
151+
}
152+
}
153+
154+
pageBuilder.declarePosition();
155+
156+
for (int i = 0; i < numColumns; i++) {
157+
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(i);
158+
if (blockBuilder.getPositionCount() != pageBuilder.getPositionCount()) {
159+
blockBuilder.appendNull();
160+
}
161+
}
162+
}
163+
164+
private OGCGeometry parseGeom(JsonParser parser)
165+
{
166+
MapGeometry mapGeom = GeometryEngine.jsonToGeometry(parser);
167+
return OGCGeometry.createFromEsriGeometry(mapGeom.getGeometry(), mapGeom.getSpatialReference());
168+
}
169+
170+
private byte[] geometryToEsriShape(OGCGeometry ogcGeometry)
171+
{
172+
requireNonNull(ogcGeometry, "ogcGeometry must not be null");
173+
174+
int wkid = ogcGeometry.SRID();
175+
176+
OGCType ogcType;
177+
String typeName = ogcGeometry.geometryType();
178+
ogcType = switch (typeName) {
179+
case "Point" -> OGCType.ST_POINT;
180+
case "LineString" -> OGCType.ST_LINESTRING;
181+
case "Polygon" -> OGCType.ST_POLYGON;
182+
case "MultiPoint" -> OGCType.ST_MULTIPOINT;
183+
case "MultiLineString" -> OGCType.ST_MULTILINESTRING;
184+
case "MultiPolygon" -> OGCType.ST_MULTIPOLYGON;
185+
default -> OGCType.UNKNOWN;
186+
};
187+
188+
return serializeGeometry(ogcGeometry.getEsriGeometry(), wkid, ogcType);
189+
}
190+
191+
private static byte[] serializeGeometry(Geometry geometry, int wkid, OGCType type)
192+
{
193+
if (geometry == null) {
194+
return null;
195+
}
196+
else {
197+
byte[] shape = GeometryEngine.geometryToEsriShape(geometry);
198+
if (shape == null) {
199+
return null;
200+
}
201+
else {
202+
byte[] shapeWithData = new byte[shape.length + 4 + 1];
203+
System.arraycopy(shape, 0, shapeWithData, 5, shape.length);
204+
205+
setWKID(shapeWithData, wkid);
206+
setType(shapeWithData, type);
207+
208+
return shapeWithData;
209+
}
210+
}
211+
}
212+
213+
private static void setWKID(byte[] geomref, int wkid)
214+
{
215+
ByteBuffer bb = ByteBuffer.allocate(4);
216+
bb.putInt(wkid);
217+
System.arraycopy(bb.array(), 0, geomref, 0, 4);
218+
}
219+
220+
private static void setType(byte[] geomref, OGCType type)
221+
{
222+
geomref[4] = (byte) type.getIndex();
223+
}
224+
225+
private void serializeValue(JsonParser parser, Type columnType, String columnName, BlockBuilder builder, boolean nullOnParseError)
226+
{
227+
try {
228+
if (BOOLEAN.equals(columnType)) {
229+
columnType.writeBoolean(builder, parser.getBooleanValue());
230+
}
231+
else if (BIGINT.equals(columnType)) {
232+
columnType.writeLong(builder, parser.getLongValue());
233+
}
234+
else if (INTEGER.equals(columnType)) {
235+
columnType.writeLong(builder, parser.getIntValue());
236+
}
237+
else if (SMALLINT.equals(columnType)) {
238+
columnType.writeLong(builder, parser.getShortValue());
239+
}
240+
else if (TINYINT.equals(columnType)) {
241+
columnType.writeLong(builder, parser.getByteValue());
242+
}
243+
else if (columnType instanceof DecimalType decimalType) {
244+
serializeDecimal(parser.getText(), decimalType, builder);
245+
}
246+
else if (REAL.equals(columnType)) {
247+
columnType.writeLong(builder, floatToRawIntBits(parser.getFloatValue()));
248+
}
249+
else if (DOUBLE.equals(columnType)) {
250+
columnType.writeDouble(builder, parser.getDoubleValue());
251+
}
252+
else if (DATE.equals(columnType)) {
253+
columnType.writeLong(builder, toIntExact(parseDate(parser).getTime() / 86400000L));
254+
}
255+
else if (columnType instanceof TimestampType timestampType) {
256+
Timestamp timestamp = parseTime(parser);
257+
DecodedTimestamp decodedTimestamp = createDecodedTimestamp(timestamp);
258+
259+
createTimestampEncoder(timestampType, UTC).write(decodedTimestamp, builder);
260+
}
261+
else if (columnType instanceof VarcharType varcharType) {
262+
if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
263+
builder.appendNull();
264+
}
265+
else {
266+
columnType.writeSlice(builder, truncateToLength(Slices.utf8Slice(parser.getText()), varcharType));
267+
}
268+
}
269+
else if (columnType instanceof CharType charType) {
270+
columnType.writeSlice(builder, truncateToLengthAndTrimSpaces(Slices.utf8Slice(parser.getText()), charType));
271+
}
272+
else {
273+
throw new RuntimeException("Column '" + columnName + "' with type: " + columnType.getDisplayName() + " is not supported");
274+
}
275+
}
276+
catch (Exception e) {
277+
// invalid columns are ignored
278+
if (nullOnParseError) {
279+
builder.appendNull();
280+
}
281+
else {
282+
throw new TrinoException(GENERIC_USER_ERROR, "Error Parsing a column in the table: " + e.getMessage(), e);
283+
}
284+
}
285+
}
286+
287+
private static DecodedTimestamp createDecodedTimestamp(Timestamp timestamp)
288+
{
289+
long millis = timestamp.getTime();
290+
long epochSeconds = floorDiv(millis, (long) MILLISECONDS_PER_SECOND);
291+
long fractionalSecond = floorMod(millis, (long) MILLISECONDS_PER_SECOND);
292+
int nanosOfSecond = toIntExact(fractionalSecond * (long) NANOSECONDS_PER_MILLISECOND);
293+
294+
return new DecodedTimestamp(epochSeconds, nanosOfSecond);
295+
}
296+
297+
private static void serializeDecimal(String value, DecimalType decimalType, BlockBuilder builder)
298+
{
299+
BigDecimal bigDecimal;
300+
try {
301+
bigDecimal = new BigDecimal(value).setScale(DecimalConversions.intScale(decimalType.getScale()), HALF_UP);
302+
}
303+
catch (NumberFormatException e) {
304+
throw new NumberFormatException(format("Cannot convert '%s' to %s. Value is not a number.", value, decimalType));
305+
}
306+
307+
if (overflows(bigDecimal, decimalType.getPrecision())) {
308+
throw new IllegalArgumentException(format("Cannot convert '%s' to %s. Value too large.", value, decimalType));
309+
}
310+
311+
if (decimalType.isShort()) {
312+
decimalType.writeLong(builder, bigDecimal.unscaledValue().longValueExact());
313+
}
314+
else {
315+
decimalType.writeObject(builder, Int128.valueOf(bigDecimal.unscaledValue()));
316+
}
317+
}
318+
319+
private Date parseDate(JsonParser parser)
320+
throws IOException
321+
{
322+
if (JsonToken.VALUE_NUMBER_INT.equals(parser.getCurrentToken())) {
323+
long epoch = parser.getLongValue();
324+
return new Date(epoch - (long) tz.getOffset(epoch));
325+
}
326+
327+
try {
328+
LocalDate localDate = LocalDate.parse(parser.getText(), DATE_FORMATTER);
329+
// Add 12 hours (43200000L milliseconds) to handle noon conversion
330+
return new Date(localDate.atStartOfDay(UTC_ZONE)
331+
.toInstant()
332+
.toEpochMilli() + 43200000L);
333+
}
334+
catch (DateTimeParseException e) {
335+
throw new IllegalArgumentException(
336+
String.format("Value '%s' cannot be parsed to a Date. Expected format: yyyy-MM-dd",
337+
parser.getText()), e);
338+
}
339+
}
340+
341+
private Timestamp parseTime(JsonParser parser)
342+
throws IOException
343+
{
344+
if (JsonToken.VALUE_NUMBER_INT.equals(parser.getCurrentToken())) {
345+
long epoch = parser.getLongValue();
346+
return new Timestamp(epoch);
347+
}
348+
349+
String value = parser.getText();
350+
int point = value.indexOf('.');
351+
String dateStr = point < 0 ? value : value.substring(0, Math.min(point + 4, value.length()));
352+
353+
for (DateTimeFormatter formatter : TIMESTAMP_FORMATTERS) {
354+
try {
355+
Instant instant;
356+
if (formatter.equals(DATE_FORMATTER)) {
357+
LocalDate localDate = LocalDate.parse(dateStr, formatter);
358+
instant = localDate.atStartOfDay(UTC_ZONE).toInstant();
359+
}
360+
else {
361+
LocalDateTime dateTime = LocalDateTime.parse(dateStr, formatter);
362+
instant = dateTime.atZone(UTC_ZONE).toInstant();
363+
}
364+
return Timestamp.from(instant);
365+
}
366+
catch (DateTimeParseException e) {
367+
// Try next formatter
368+
}
369+
}
370+
371+
throw new IllegalArgumentException(
372+
String.format("Value '%s' cannot be parsed to a Timestamp", dateStr));
373+
}
374+
}

0 commit comments

Comments
 (0)