diff --git a/athena-example/src/main/java/com/amazonaws/athena/connectors/example/ExampleRecordHandler.java b/athena-example/src/main/java/com/amazonaws/athena/connectors/example/ExampleRecordHandler.java index dc522c86e4..b636d5a248 100644 --- a/athena-example/src/main/java/com/amazonaws/athena/connectors/example/ExampleRecordHandler.java +++ b/athena-example/src/main/java/com/amazonaws/athena/connectors/example/ExampleRecordHandler.java @@ -50,8 +50,12 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static java.lang.String.format; @@ -121,11 +125,12 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor * TODO: Extract information about what we need to read from the split. If you are following the tutorial * this is basically the partition column values for year, month, day. * + * * + * */ splitYear = split.getPropertyAsInt("year"); splitMonth = split.getPropertyAsInt("month"); splitDay = split.getPropertyAsInt("day"); - * - */ + String dataBucket = null; /** @@ -150,6 +155,7 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor * optomized code for converting our data to Apache Arrow, automatically minimizing memory overhead, code * branches, etc... Later in the code when we call RowWriter for each line in our S3 file * + * */ builder.withExtractor("year", (IntExtractor) (Object context, NullableIntHolder value) -> { value.isSet = 1; value.value = Integer.parseInt(((String[]) context)[0]); @@ -169,24 +175,26 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor value.isSet = 1; value.value = ((String[]) context)[6]; }); - */ + /** * TODO: The account_id field is a sensitive field, so we'd like to mask it to the last 4 before * returning it to Athena. Note that this will mean you can only filter (where/having) * on the masked value from Athena. * + * */ builder.withExtractor("account_id", (VarCharExtractor) (Object context, NullableVarCharHolder value) -> { value.isSet = 1; String accountId = ((String[]) context)[3]; value.value = accountId.length() > 4 ? accountId.substring(accountId.length() - 4) : accountId; }); - */ + /** * TODO: Write data for our transaction STRUCT: * For complex types like List and Struct, we can build a Map to conveniently set nested values * + * */ builder.withFieldWriterFactory("transaction", (FieldVector vector, Extractor extractor, ConstraintProjector constraint) -> (Object context, int rowNum) -> { @@ -196,7 +204,27 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor BlockUtils.setComplexValue(vector, rowNum, FieldResolver.DEFAULT, eventMap); return true; //we don't yet support predicate pushdown on complex types }); - */ + + + builder.withFieldWriterFactory("map_field", + (FieldVector vector, Extractor extractor, ConstraintProjector constraint) -> + (Object context, int rowNum) -> { + Map eventMap = new HashMap<>(); + String[] entries = (((String[])context)[7]).split("&"); + + List keys = new ArrayList<>(entries.length); + List values = new ArrayList<>(entries.length); + for(String entry: entries) { + String[] eachEntry = entry.split("="); + keys.add(eachEntry[0]); + values.add(eachEntry[1]); + } + Map> entryMap = new HashMap<>(2); + entryMap.put("key", keys); + entryMap.put("value", values); + BlockUtils.setComplexValue(vector, rowNum, FieldResolver.DEFAULT, entryMap); + return true; //we don't yet support predicate pushdown on complex types + }); //Used some basic code-gen to optimize how we generate response data. GeneratedRowWriter rowWriter = builder.build(); diff --git a/athena-example/src/test/java/com/amazonaws/athena/connectors/example/ExampleRecordHandlerTest.java b/athena-example/src/test/java/com/amazonaws/athena/connectors/example/ExampleRecordHandlerTest.java index 5a5af6191b..a02b1c2e98 100644 --- a/athena-example/src/test/java/com/amazonaws/athena/connectors/example/ExampleRecordHandlerTest.java +++ b/athena-example/src/test/java/com/amazonaws/athena/connectors/example/ExampleRecordHandlerTest.java @@ -100,6 +100,7 @@ public void setUp() .addChildField("transaction", "id", Types.MinorType.INT.getType()) .addChildField("transaction", "completed", Types.MinorType.BIT.getType()) .addMetadata("partitionCols", "year,month,day") + .addMapField("map_field") .build(); allocator = new BlockAllocatorImpl(); @@ -132,6 +133,7 @@ public Object answer(InvocationOnMock invocationOnMock) public void doReadRecordsNoSpill() throws Exception { + /* if (!enableTests) { //We do this because until you complete the tutorial these tests will fail. When you attempt to publis //using ../toos/publish.sh ... it will set the publishing flag and force these tests. This is how we @@ -142,6 +144,8 @@ public void doReadRecordsNoSpill() return; } + */ + for (int i = 0; i < 2; i++) { Map constraintsMap = new HashMap<>(); @@ -175,17 +179,17 @@ private byte[] getFakeObject() throws UnsupportedEncodingException { StringBuilder sb = new StringBuilder(); - sb.append("2017,11,1,2122792308,1755604178,false,0UTIXoWnKqtQe8y+BSHNmdEXmWfQalRQH60pobsgwws=\n"); - sb.append("2017,11,1,2030248245,747575690,false,i9AoMmLI6JidPjw/SFXduBB6HUmE8aXQLMhekhIfE1U=\n"); - sb.append("2017,11,1,23301515,1720603622,false,HWsLCXAnGFXnnjD8Nc1RbO0+5JzrhnCB/feJ/EzSxto=\n"); - sb.append("2017,11,1,1342018392,1167647466,false,lqL0mxeOeEesRY7EU95Fi6QEW92nj2mh8xyex69j+8A=\n"); - sb.append("2017,11,1,945994127,1854103174,true,C57VAyZ6Y0C+xKA2Lv6fOcIP0x6Px8BlEVBGSc74C4I=\n"); - sb.append("2017,11,1,1102797454,2117019257,true,oO0S69X+N2RSyEhlzHguZSLugO8F2cDVDpcAslg0hhQ=\n"); - sb.append("2017,11,1,862601609,392155621,true,L/Wpz4gHiRR7Sab1RCBrp4i1k+0IjUuJAV/Yn/7kZnc=\n"); - sb.append("2017,11,1,1858905353,1131234096,false,w4R3N+vN/EcwrWP7q/h2DwyhyraM1AwLbCbe26a+mQ0=\n"); - sb.append("2017,11,1,1300070253,247762646,false,cjbs6isGO0K7ib1D65VbN4lZEwQv2Y6Q/PoFZhyyacA=\n"); - sb.append("2017,11,1,843851309,1886346292,true,sb/xc+uoe/ZXRXTYIv9OTY33Rj+zSS96Mj/3LVPXvRM=\n"); - sb.append("2017,11,1,2013370128,1783091056,false,9MW9X3OUr40r4B/qeLz55yJIrvw7Gdk8RWUulNadIyw=\n"); + sb.append("2017,11,1,2122792308,1755604178,false,0UTIXoWnKqtQe8y+BSHNmdEXmWfQalRQH60pobsgwws=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,2030248245,747575690,false,i9AoMmLI6JidPjw/SFXduBB6HUmE8aXQLMhekhIfE1U=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,23301515,1720603622,false,HWsLCXAnGFXnnjD8Nc1RbO0+5JzrhnCB/feJ/EzSxto=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,1342018392,1167647466,false,lqL0mxeOeEesRY7EU95Fi6QEW92nj2mh8xyex69j+8A=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,945994127,1854103174,true,C57VAyZ6Y0C+xKA2Lv6fOcIP0x6Px8BlEVBGSc74C4I=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,1102797454,2117019257,true,oO0S69X+N2RSyEhlzHguZSLugO8F2cDVDpcAslg0hhQ=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,862601609,392155621,true,L/Wpz4gHiRR7Sab1RCBrp4i1k+0IjUuJAV/Yn/7kZnc=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,1858905353,1131234096,false,w4R3N+vN/EcwrWP7q/h2DwyhyraM1AwLbCbe26a+mQ0=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,1300070253,247762646,false,cjbs6isGO0K7ib1D65VbN4lZEwQv2Y6Q/PoFZhyyacA=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,843851309,1886346292,true,sb/xc+uoe/ZXRXTYIv9OTY33Rj+zSS96Mj/3LVPXvRM=,k1=v1&k2=v2\n"); + sb.append("2017,11,1,2013370128,1783091056,false,9MW9X3OUr40r4B/qeLz55yJIrvw7Gdk8RWUulNadIyw=,k1=v1&k2=v2\n"); return sb.toString().getBytes("UTF-8"); } diff --git a/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/SchemaBuilder.java b/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/SchemaBuilder.java index 41ce172b43..10877a6040 100644 --- a/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/SchemaBuilder.java +++ b/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/SchemaBuilder.java @@ -27,6 +27,7 @@ import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -89,6 +90,24 @@ public SchemaBuilder addStructField(String fieldName) return this; } + /** + * Adds a new MAP Field to the Schema as a top-level Field. + * + * @param fieldName The name of the field to add. + * @return This SchemaBuilder itself. + */ + public SchemaBuilder addMapField(String fieldName) + { + fields.put(fieldName, + new Field(fieldName, FieldType.nullable(new ArrowType.Map(false)), + Arrays.asList(new Field("entries", false, Types.MinorType.STRUCT.getType(), + Arrays.asList( + new Field("key", false, Types.MinorType.LIST.getType(), Collections.singletonList(new Field("", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null))), + new Field("value", true, Types.MinorType.LIST.getType(), Collections.singletonList(new Field("", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null))) + ))))); + return this; + } + /** * Adds a new LIST Field to the Schema as a top-level Field. *