Skip to content

Commit 8c089c5

Browse files
authored
Merge pull request #264 from felix-schott/100_extract
add 'extract' command with column and spatial subsetting
2 parents 070cb37 + 65a4807 commit 8c089c5

18 files changed

Lines changed: 1479 additions & 30 deletions

cmd/gpq/command/command.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ var CLI struct {
1515
Convert ConvertCmd `cmd:"" help:"Convert data from one format to another."`
1616
Validate ValidateCmd `cmd:"" help:"Validate a GeoParquet file."`
1717
Describe DescribeCmd `cmd:"" help:"Describe a GeoParquet file."`
18+
Extract ExtractCmd `cmd:"" help:"Extract columns by name or rows by spatial subsetting."`
1819
Version VersionCmd `cmd:"" help:"Print the version of this program."`
1920
}
2021

@@ -49,3 +50,11 @@ func readerFromInput(input string) (storage.ReaderAtSeeker, error) {
4950

5051
return os.Open(input)
5152
}
53+
54+
func hasStdin() bool {
55+
stats, err := os.Stdin.Stat()
56+
if err != nil {
57+
return false
58+
}
59+
return stats.Size() > 0
60+
}

cmd/gpq/command/convert.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type ConvertCmd struct {
3232
To string `help:"Output file format. Possible values: ${enum}." enum:"auto, geojson, geoparquet" default:"auto"`
3333
Min int `help:"Minimum number of features to consider when building a schema." default:"10"`
3434
Max int `help:"Maximum number of features to consider when building a schema." default:"100"`
35-
InputPrimaryColumn string `help:"Primary geometry column name when reading Parquet withtout metadata." default:"geometry"`
35+
InputPrimaryColumn string `help:"Primary geometry column name when reading Parquet without metadata." default:"geometry"`
3636
Compression string `help:"Parquet compression to use. Possible values: ${enum}." enum:"uncompressed, snappy, gzip, brotli, zstd" default:"zstd"`
3737
RowGroupLength int `help:"Maximum number of rows per group when writing Parquet."`
3838
}
@@ -100,14 +100,6 @@ func getFormatType(resource string) FormatType {
100100
return UnknownType
101101
}
102102

103-
func hasStdin() bool {
104-
stats, err := os.Stdin.Stat()
105-
if err != nil {
106-
return false
107-
}
108-
return stats.Size() > 0
109-
}
110-
111103
func (c *ConvertCmd) Run() error {
112104
inputSource := c.Input
113105
outputSource := c.Output

cmd/gpq/command/extract.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package command
2+
3+
import (
4+
"context"
5+
"io"
6+
"os"
7+
"strings"
8+
9+
"github.com/apache/arrow/go/v16/arrow"
10+
"github.com/planetlabs/gpq/internal/geo"
11+
"github.com/planetlabs/gpq/internal/geoparquet"
12+
)
13+
14+
type ExtractCmd struct {
15+
Input string `arg:"" optional:"" name:"input" help:"Input file path or URL. If not provided, input is read from stdin."`
16+
Output string `arg:"" optional:"" name:"output" help:"Output file. If not provided, output is written to stdout." type:"path"`
17+
Bbox string `help:"Filter features by intersection of their bounding box with the provided bounding box (in x_min,y_min,x_max,y_max format)."`
18+
DropCols string `help:"Drop the provided columns. Provide a comma-separated string of column names to be excluded. Do not use together with --keep-only-cols."`
19+
KeepOnlyCols string `help:"Keep only the provided columns. Provide a comma-separated string of columns to be kept. Do not use together with --drop-cols."`
20+
}
21+
22+
func (c *ExtractCmd) Run() error {
23+
24+
// validate and transform inputs
25+
26+
inputSource := c.Input
27+
outputSource := c.Output
28+
29+
if c.Input == "" && hasStdin() {
30+
outputSource = inputSource
31+
inputSource = ""
32+
}
33+
34+
input, inputErr := readerFromInput(inputSource)
35+
if inputErr != nil {
36+
return NewCommandError("trouble getting a reader from %q: %w", c.Input, inputErr)
37+
}
38+
39+
var output *os.File
40+
if outputSource == "" {
41+
output = os.Stdout
42+
} else {
43+
o, createErr := os.Create(outputSource)
44+
if createErr != nil {
45+
return NewCommandError("failed to open %q for writing: %w", outputSource, createErr)
46+
}
47+
defer o.Close()
48+
output = o
49+
}
50+
51+
// prepare input reader (ignore certain columns if asked to - DropCols/KeepOnlyCols)
52+
config := &geoparquet.ReaderConfig{Reader: input}
53+
54+
parquetFileReader, err := geoparquet.NewParquetFileReader(config)
55+
if err != nil {
56+
return NewCommandError("could not get ParquetFileReader: %w", err)
57+
}
58+
59+
arrowFileReader, err := geoparquet.NewArrowFileReader(config, parquetFileReader)
60+
if err != nil {
61+
return NewCommandError("could not get ArrowFileReader: %w", err)
62+
}
63+
64+
geoMetadata, err := geoparquet.GetMetadataFromFileReader(parquetFileReader)
65+
if err != nil {
66+
return NewCommandError("could not get geo metadata from file reader: %w", err)
67+
}
68+
69+
arrowSchema, schemaErr := arrowFileReader.Schema()
70+
if schemaErr != nil {
71+
return NewCommandError("trouble getting arrow schema: %w", schemaErr)
72+
}
73+
74+
// projection pushdown - column filtering
75+
var columnIndices []int = nil
76+
77+
var includeColumns []string
78+
var excludeColumns []string
79+
if c.DropCols != "" {
80+
excludeColumns = strings.Split(c.DropCols, ",")
81+
}
82+
if c.KeepOnlyCols != "" {
83+
includeColumns = strings.Split(c.KeepOnlyCols, ",")
84+
}
85+
86+
excludeColNamesProvided := len(excludeColumns) > 0
87+
includeColNamesProvided := len(includeColumns) > 0
88+
89+
if excludeColNamesProvided || includeColNamesProvided {
90+
if excludeColNamesProvided == includeColNamesProvided {
91+
return NewCommandError("please pass only one of DropColumns/KeepOnlyColumns")
92+
}
93+
94+
if includeColNamesProvided {
95+
columnIndices, err = geoparquet.GetColumnIndices(includeColumns, arrowSchema)
96+
if err != nil {
97+
return NewCommandError("trouble inferring column names (positive selection): %w", err)
98+
}
99+
}
100+
101+
if excludeColNamesProvided {
102+
columnIndices, err = geoparquet.GetColumnIndicesByDifference(excludeColumns, arrowSchema)
103+
if err != nil {
104+
return NewCommandError("trouble inferring column names (negative selection): %w", err)
105+
}
106+
}
107+
}
108+
config.Columns = columnIndices
109+
110+
// predicate pushdown - spatial row filtering
111+
var rowGroups []int = nil
112+
113+
// parse bbox filter argument into geo.Bbox struct if applicable
114+
inputBbox, err := geo.NewBboxFromString(c.Bbox)
115+
if err != nil {
116+
return NewCommandError("trouble getting bbox from input string: %w", err)
117+
}
118+
var bboxCol *geoparquet.BboxColumn
119+
if inputBbox != nil {
120+
bboxCol = geoparquet.GetBboxColumn(parquetFileReader.MetaData().Schema, geoMetadata)
121+
122+
if bboxCol.Name != "" { // if there is a bbox col in the file
123+
rowGroups, err = geoparquet.GetRowGroupsByBbox(parquetFileReader, bboxCol, inputBbox)
124+
if err != nil {
125+
return NewCommandError("trouble scanning row group metadata: %w", err)
126+
}
127+
}
128+
}
129+
130+
config.RowGroups = rowGroups
131+
132+
// create new record reader - based on the config values for
133+
// Columns and RowGroups it will only read a subset of
134+
// columns and row groups
135+
ctx := context.Background()
136+
137+
recordReader, err := geoparquet.NewRecordReader(ctx, arrowFileReader, geoMetadata, columnIndices, rowGroups)
138+
if err != nil {
139+
return NewCommandError("trouble creating geoparquet record reader: %w", err)
140+
}
141+
defer recordReader.Close()
142+
143+
// prepare output writer
144+
recordWriter, rwErr := geoparquet.NewRecordWriter(&geoparquet.WriterConfig{
145+
Writer: output,
146+
Metadata: recordReader.Metadata(),
147+
ArrowSchema: recordReader.ArrowSchema(),
148+
})
149+
if rwErr != nil {
150+
return NewCommandError("trouble getting record writer: %w", rwErr)
151+
}
152+
defer recordWriter.Close()
153+
154+
// read and write records in loop
155+
for {
156+
record, readErr := recordReader.Read()
157+
if readErr == io.EOF {
158+
break
159+
}
160+
if readErr != nil {
161+
return readErr
162+
}
163+
164+
// filter by bbox if asked to
165+
var filteredRecord *arrow.Record
166+
if inputBbox != nil && bboxCol != nil {
167+
var filterErr error
168+
filteredRecord, filterErr = geoparquet.FilterRecordBatchByBbox(ctx, &record, inputBbox, bboxCol)
169+
if filterErr != nil {
170+
return NewCommandError("trouble filtering record batch by bbox: %w", filterErr)
171+
}
172+
} else {
173+
filteredRecord = &record
174+
}
175+
176+
if err := recordWriter.Write(*filteredRecord); err != nil {
177+
return err
178+
}
179+
}
180+
return nil
181+
}

cmd/gpq/command/extract_test.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package command_test
2+
3+
import (
4+
"bytes"
5+
6+
"github.com/apache/arrow/go/v16/parquet/file"
7+
"github.com/planetlabs/gpq/cmd/gpq/command"
8+
"github.com/planetlabs/gpq/internal/geoparquet"
9+
)
10+
11+
func (s *Suite) TestExtractDropCols() {
12+
cmd := &command.ExtractCmd{
13+
Input: "../../../internal/testdata/cases/example-v1.0.0.parquet",
14+
DropCols: "pop_est,iso_a3",
15+
}
16+
s.Require().NoError(cmd.Run())
17+
18+
data := s.readStdout()
19+
20+
fileReader, err := file.NewParquetReader(bytes.NewReader(data))
21+
s.Require().NoError(err)
22+
defer fileReader.Close()
23+
24+
s.Equal(int64(5), fileReader.NumRows())
25+
26+
s.Require().NoError(err)
27+
s.Equal(4, fileReader.MetaData().Schema.NumColumns())
28+
29+
recordReader, err := geoparquet.NewRecordReaderFromConfig(&geoparquet.ReaderConfig{
30+
Reader: bytes.NewReader(data),
31+
})
32+
s.Require().NoError(err)
33+
defer recordReader.Close()
34+
35+
record, readErr := recordReader.Read()
36+
s.Require().NoError(readErr)
37+
s.Assert().Equal(int64(4), record.NumCols())
38+
}
39+
40+
func (s *Suite) TestExtractKeepOnlyCols() {
41+
cmd := &command.ExtractCmd{
42+
Input: "../../../internal/testdata/cases/example-v1.1.0.parquet",
43+
KeepOnlyCols: "geometry,pop_est,iso_a3",
44+
}
45+
s.Require().NoError(cmd.Run())
46+
47+
data := s.readStdout()
48+
49+
fileReader, err := file.NewParquetReader(bytes.NewReader(data))
50+
s.Require().NoError(err)
51+
defer fileReader.Close()
52+
53+
s.Equal(int64(5), fileReader.NumRows())
54+
55+
s.Require().NoError(err)
56+
s.Equal(3, fileReader.MetaData().Schema.NumColumns())
57+
58+
recordReader, err := geoparquet.NewRecordReaderFromConfig(&geoparquet.ReaderConfig{
59+
Reader: bytes.NewReader(data),
60+
})
61+
s.Require().NoError(err)
62+
defer recordReader.Close()
63+
64+
record, readErr := recordReader.Read()
65+
s.Require().NoError(readErr)
66+
s.Assert().Equal(int64(3), record.NumCols())
67+
}
68+
69+
// Since the 1.1.0 parquet file includes a bbox column, we expect the bbox column to be used for spatial filtering.
70+
func (s *Suite) TestExtractBbox110() {
71+
cmd := &command.ExtractCmd{
72+
Input: "../../../internal/testdata/cases/example-v1.1.0.parquet",
73+
Bbox: "34,-7,36,-6",
74+
}
75+
s.Require().NoError(cmd.Run())
76+
77+
data := s.readStdout()
78+
79+
recordReader, err := geoparquet.NewRecordReaderFromConfig(&geoparquet.ReaderConfig{
80+
Reader: bytes.NewReader(data),
81+
})
82+
s.Require().NoError(err)
83+
defer recordReader.Close()
84+
85+
// we expect only one row, namely Tanzania
86+
s.Require().Equal(int64(1), recordReader.NumRows())
87+
88+
record, readErr := recordReader.Read()
89+
s.Require().NoError(readErr)
90+
s.Assert().Equal(int64(7), record.NumCols())
91+
s.Assert().Equal(int64(1), record.NumRows())
92+
93+
country := record.Column(recordReader.Schema().ColumnIndexByName("name")).ValueStr(0)
94+
s.Assert().Equal("Tanzania", country)
95+
}
96+
97+
// Since the 1.1.0 parquet file includes a bbox column and is partitioned into spatially ordered row groups,
98+
// we expect the bbox column row group statistic to be used for spatial pushdown filtering.
99+
func (s *Suite) TestExtractBbox110Partitioned() {
100+
cmd := &command.ExtractCmd{
101+
Input: "../../../internal/testdata/cases/example-v1.1.0-partitioned.parquet",
102+
Bbox: "34,-7,36,-6",
103+
}
104+
s.Require().NoError(cmd.Run())
105+
106+
data := s.readStdout()
107+
108+
recordReader, err := geoparquet.NewRecordReaderFromConfig(&geoparquet.ReaderConfig{
109+
Reader: bytes.NewReader(data),
110+
})
111+
s.Require().NoError(err)
112+
defer recordReader.Close()
113+
114+
// we expect only one row, namely Tanzania
115+
s.Require().Equal(int64(1), recordReader.NumRows())
116+
117+
record, readErr := recordReader.Read()
118+
s.Require().NoError(readErr)
119+
s.Assert().Equal(int64(8), record.NumCols())
120+
s.Assert().Equal(int64(1), record.NumRows())
121+
122+
country := record.Column(recordReader.Schema().ColumnIndexByName("name")).ValueStr(0)
123+
s.Assert().Equal("Tanzania", country)
124+
}
125+
126+
// Since the 1.0.0 parquet file doesn't have a bbox column, we expect the bbox column to be calculated on the fly.
127+
func (s *Suite) TestExtractBbox100() {
128+
cmd := &command.ExtractCmd{
129+
Input: "../../../internal/testdata/cases/example-v1.0.0.parquet",
130+
Bbox: "34,-7,36,-6",
131+
}
132+
s.Require().NoError(cmd.Run())
133+
134+
data := s.readStdout()
135+
136+
recordReader, err := geoparquet.NewRecordReaderFromConfig(&geoparquet.ReaderConfig{
137+
Reader: bytes.NewReader(data),
138+
})
139+
s.Require().NoError(err)
140+
defer recordReader.Close()
141+
142+
// we expect only one row, namely Tanzania
143+
s.Require().Equal(int64(1), recordReader.NumRows())
144+
145+
record, readErr := recordReader.Read()
146+
s.Require().NoError(readErr)
147+
s.Assert().Equal(int64(6), record.NumCols())
148+
s.Assert().Equal(int64(1), record.NumRows())
149+
150+
country := record.Column(recordReader.Schema().ColumnIndexByName("name")).ValueStr(0)
151+
s.Assert().Equal("Tanzania", country)
152+
}

0 commit comments

Comments
 (0)