Skip to content

Commit b7ff2ec

Browse files
committed
Add test for data skipping using row-group level geospatial statistics
1 parent 20c029c commit b7ff2ec

File tree

1 file changed

+192
-0
lines changed

1 file changed

+192
-0
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.data.parquet;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.nio.ByteBuffer;
26+
import org.apache.iceberg.Files;
27+
import org.apache.iceberg.Schema;
28+
import org.apache.iceberg.data.GenericRecord;
29+
import org.apache.iceberg.data.Record;
30+
import org.apache.iceberg.expressions.Expression;
31+
import org.apache.iceberg.expressions.Expressions;
32+
import org.apache.iceberg.geospatial.BoundingBox;
33+
import org.apache.iceberg.geospatial.GeospatialBound;
34+
import org.apache.iceberg.io.FileAppender;
35+
import org.apache.iceberg.io.OutputFile;
36+
import org.apache.iceberg.parquet.Parquet;
37+
import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
38+
import org.apache.iceberg.types.Types;
39+
import org.apache.parquet.hadoop.ParquetFileReader;
40+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
41+
import org.apache.parquet.io.LocalInputFile;
42+
import org.apache.parquet.schema.MessageType;
43+
import org.junit.jupiter.api.BeforeEach;
44+
import org.junit.jupiter.api.Test;
45+
import org.junit.jupiter.api.io.TempDir;
46+
import org.locationtech.jts.geom.Envelope;
47+
import org.locationtech.jts.geom.Geometry;
48+
import org.locationtech.jts.geom.GeometryFactory;
49+
import org.locationtech.jts.io.WKBWriter;
50+
51+
public class TestGeospatialMetricsRowGroupFilter {
52+
private static final Schema SCHEMA =
53+
new Schema(
54+
Types.NestedField.required(1, "id", Types.IntegerType.get()),
55+
Types.NestedField.optional(2, "geom", Types.GeometryType.crs84()));
56+
57+
@TempDir private File tempDir;
58+
59+
private static class ParquetFileMetadata {
60+
BlockMetaData blockMetaData;
61+
MessageType schema;
62+
}
63+
64+
private ParquetFileMetadata nonEmptyBlockMetadata;
65+
private ParquetFileMetadata emptyBlockMetadata;
66+
private ParquetFileMetadata nullBlockMetadata;
67+
68+
@BeforeEach
69+
public void createTestFiles() throws IOException {
70+
File nonEmptyFile = new File(tempDir, "test_file_non_empty.parquet");
71+
File emptyFile = new File(tempDir, "test_file_empty.parquet");
72+
File nullFile = new File(tempDir, "test_file_null.parquet");
73+
74+
GeometryFactory factory = new GeometryFactory();
75+
WKBWriter wkbWriter = new WKBWriter();
76+
77+
// Create test files with different geometries
78+
GenericRecord record = GenericRecord.create(SCHEMA);
79+
record.setField("id", 1);
80+
Geometry polygon = factory.toGeometry(new Envelope(1, 2, 3, 4));
81+
byte[] polygonWkb = wkbWriter.write(polygon);
82+
83+
record.setField("geom", ByteBuffer.wrap(polygonWkb));
84+
nonEmptyBlockMetadata = createFileWithRecord(nonEmptyFile, record);
85+
86+
byte[] emptyLineString = wkbWriter.write(factory.createLineString());
87+
record.setField("geom", ByteBuffer.wrap(emptyLineString));
88+
emptyBlockMetadata = createFileWithRecord(emptyFile, record);
89+
90+
record.setField("geom", null);
91+
nullBlockMetadata = createFileWithRecord(nullFile, record);
92+
}
93+
94+
private ParquetFileMetadata createFileWithRecord(File file, GenericRecord record)
95+
throws IOException {
96+
OutputFile outputFile = Files.localOutput(file);
97+
try (FileAppender<Record> appender =
98+
Parquet.write(outputFile)
99+
.schema(SCHEMA)
100+
.createWriterFunc(fileSchema -> InternalWriter.create(SCHEMA.asStruct(), fileSchema))
101+
.build()) {
102+
appender.add(record);
103+
}
104+
105+
LocalInputFile inFile = new LocalInputFile(file.toPath());
106+
try (ParquetFileReader reader = ParquetFileReader.open(inFile)) {
107+
assertThat(reader.getRowGroups()).as("Should create only one row group").hasSize(1);
108+
ParquetFileMetadata metadata = new ParquetFileMetadata();
109+
metadata.schema = reader.getFileMetaData().getSchema();
110+
metadata.blockMetaData = reader.getRowGroups().get(0);
111+
return metadata;
112+
}
113+
}
114+
115+
@Test
116+
public void testHitNonEmptyFile() {
117+
boolean shouldRead =
118+
shouldReadParquet(
119+
Expressions.stIntersects("geom", createBoundingBox(1, 3, 2, 4)), nonEmptyBlockMetadata);
120+
assertThat(shouldRead).isTrue();
121+
122+
shouldRead =
123+
shouldReadParquet(
124+
Expressions.stIntersects("geom", createBoundingBox(1, 2, 3, 4)), nonEmptyBlockMetadata);
125+
assertThat(shouldRead).isTrue();
126+
127+
shouldRead =
128+
shouldReadParquet(
129+
Expressions.stIntersects("geom", createBoundingBox(0, 0, 5, 5)), nonEmptyBlockMetadata);
130+
assertThat(shouldRead).isTrue();
131+
132+
shouldRead =
133+
shouldReadParquet(
134+
Expressions.stIntersects("geom", createBoundingBox(1, 3, 1, 3)), nonEmptyBlockMetadata);
135+
assertThat(shouldRead).isTrue();
136+
137+
shouldRead =
138+
shouldReadParquet(
139+
Expressions.stIntersects("geom", createBoundingBox(2, 4, 2, 4)), nonEmptyBlockMetadata);
140+
assertThat(shouldRead).isTrue();
141+
}
142+
143+
@Test
144+
public void testNotHitNonEmptyFile() {
145+
boolean shouldRead =
146+
shouldReadParquet(
147+
Expressions.stIntersects("geom", createBoundingBox(0, 0, 1, 1)), nonEmptyBlockMetadata);
148+
assertThat(shouldRead).isFalse();
149+
150+
shouldRead =
151+
shouldReadParquet(
152+
Expressions.stIntersects("geom", createBoundingBox(4, 4, 5, 5)), nonEmptyBlockMetadata);
153+
assertThat(shouldRead).isFalse();
154+
}
155+
156+
@Test
157+
public void testHitEmptyFile() {
158+
// We cannot skip row groups without geospatial bounding box.
159+
boolean shouldRead =
160+
shouldReadParquet(
161+
Expressions.stIntersects("geom", createBoundingBox(1, 3, 2, 4)), emptyBlockMetadata);
162+
assertThat(shouldRead).isTrue();
163+
164+
shouldRead =
165+
shouldReadParquet(
166+
Expressions.stIntersects("geom", createBoundingBox(1, 2, 3, 4)), emptyBlockMetadata);
167+
assertThat(shouldRead).isTrue();
168+
}
169+
170+
@Test
171+
public void testNotHitNullFile() {
172+
boolean shouldRead =
173+
shouldReadParquet(
174+
Expressions.stIntersects("geom", createBoundingBox(1, 3, 2, 4)), nullBlockMetadata);
175+
assertThat(shouldRead).isFalse();
176+
177+
shouldRead =
178+
shouldReadParquet(
179+
Expressions.stIntersects("geom", createBoundingBox(1, 2, 3, 4)), nullBlockMetadata);
180+
assertThat(shouldRead).isFalse();
181+
}
182+
183+
private boolean shouldReadParquet(Expression expression, ParquetFileMetadata metadata) {
184+
return new ParquetMetricsRowGroupFilter(SCHEMA, expression, true)
185+
.shouldRead(metadata.schema, metadata.blockMetaData);
186+
}
187+
188+
private static BoundingBox createBoundingBox(double minX, double minY, double maxX, double maxY) {
189+
return new BoundingBox(
190+
GeospatialBound.createXY(minX, minY), GeospatialBound.createXY(maxX, maxY));
191+
}
192+
}

0 commit comments

Comments
 (0)