Skip to content

Commit c8ee812

Browse files
committed
fix: Bubble up errors for cursors
1 parent 6a1cc9f commit c8ee812

15 files changed

+91
-46
lines changed

mock/reads_resultset.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package mock
22

33
import (
4+
"fmt"
5+
46
"github.com/influxdata/influxdb/v2/models"
57
"github.com/influxdata/influxdb/v2/pkg/data/gen"
68
"github.com/influxdata/influxdb/v2/storage/reads"
@@ -61,7 +63,7 @@ func (g *GeneratorResultSet) Next() bool {
6163
return g.sg.Next() && (g.max == 0 || remain > 0)
6264
}
6365

64-
func (g *GeneratorResultSet) Cursor() cursors.Cursor {
66+
func (g *GeneratorResultSet) Cursor() (cursors.Cursor, error) {
6567
switch g.sg.FieldType() {
6668
case models.Float:
6769
g.f.tv = g.sg.TimeValuesGenerator()
@@ -79,10 +81,10 @@ func (g *GeneratorResultSet) Cursor() cursors.Cursor {
7981
g.b.tv = g.sg.TimeValuesGenerator()
8082
g.cur = &g.b
8183
default:
82-
panic("unreachable")
84+
return nil, fmt.Errorf("unsupported field type: %v", g.sg.FieldType())
8385
}
8486

85-
return g.cur
87+
return g.cur, nil
8688
}
8789

8890
func copyTags(dst, src models.Tags) models.Tags {

storage/flux/reader.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ func (fi *filterIterator) handleRead(f func(flux.Table) error, rs storage.Result
194194

195195
READ:
196196
for rs.Next() {
197-
cur = rs.Cursor()
197+
cur, err := rs.Cursor()
198+
if err != nil {
199+
return err
200+
}
198201
if cur == nil {
199202
// no data for series key + field combination
200203
continue
@@ -331,7 +334,7 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe
331334
READ:
332335
for gc != nil {
333336
for gc.Next() {
334-
cur = gc.Cursor()
337+
cur, _ = gc.Cursor()
335338
if cur != nil {
336339
break
337340
}
@@ -740,7 +743,10 @@ func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs stor
740743

741744
READ:
742745
for rs.Next() {
743-
cur = rs.Cursor()
746+
cur, err = rs.Cursor()
747+
if err != nil {
748+
return err
749+
}
744750
if cur == nil {
745751
// no data for series key + field combination
746752
continue

storage/flux/table.gen.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ func (t *floatGroupTable) advanceCursor() bool {
970970
t.cur.Close()
971971
t.cur = nil
972972
for t.gc.Next() {
973-
cur := t.gc.Cursor()
973+
cur, _ := t.gc.Cursor()
974974
if cur == nil {
975975
continue
976976
}
@@ -1954,7 +1954,7 @@ func (t *integerGroupTable) advanceCursor() bool {
19541954
t.cur.Close()
19551955
t.cur = nil
19561956
for t.gc.Next() {
1957-
cur := t.gc.Cursor()
1957+
cur, _ := t.gc.Cursor()
19581958
if cur == nil {
19591959
continue
19601960
}
@@ -2935,7 +2935,7 @@ func (t *unsignedGroupTable) advanceCursor() bool {
29352935
t.cur.Close()
29362936
t.cur = nil
29372937
for t.gc.Next() {
2938-
cur := t.gc.Cursor()
2938+
cur, _ := t.gc.Cursor()
29392939
if cur == nil {
29402940
continue
29412941
}
@@ -3860,7 +3860,7 @@ func (t *stringGroupTable) advanceCursor() bool {
38603860
t.cur.Close()
38613861
t.cur = nil
38623862
for t.gc.Next() {
3863-
cur := t.gc.Cursor()
3863+
cur, _ := t.gc.Cursor()
38643864
if cur == nil {
38653865
continue
38663866
}
@@ -4785,7 +4785,7 @@ func (t *booleanGroupTable) advanceCursor() bool {
47854785
t.cur.Close()
47864786
t.cur = nil
47874787
for t.gc.Next() {
4788-
cur := t.gc.Cursor()
4788+
cur, _ := t.gc.Cursor()
47894789
if cur == nil {
47904790
continue
47914791
}

storage/flux/table.gen.go.tmpl

+1-1
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,7 @@ func (t *{{.name}}GroupTable) advanceCursor() bool {
10001000
t.cur.Close()
10011001
t.cur = nil
10021002
for t.gc.Next() {
1003-
cur := t.gc.Cursor()
1003+
cur, _ := t.gc.Cursor()
10041004
if cur == nil {
10051005
continue
10061006
}

storage/reads/aggregate_resultset.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
9797
agg := r.req.Aggregate[0]
9898
every := r.req.WindowEvery
9999
offset := r.req.Offset
100-
cursor := r.arrayCursors.createCursor(seriesRow)
100+
cursor, err := r.arrayCursors.createCursor(seriesRow)
101+
if err != nil {
102+
return nil, err
103+
}
101104

102105
var everyDur values.Duration
103106
var offsetDur values.Duration
@@ -132,8 +135,8 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
132135
}
133136
}
134137

135-
func (r *windowAggregateResultSet) Cursor() cursors.Cursor {
136-
return r.cursor
138+
func (r *windowAggregateResultSet) Cursor() (cursors.Cursor, error) {
139+
return r.cursor, nil
137140
}
138141

139142
func (r *windowAggregateResultSet) Close() {

storage/reads/aggregate_resultset_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package reads_test
22

33
import (
44
"context"
5+
"github.com/stretchr/testify/require"
56
"reflect"
67
"testing"
78
"time"
@@ -196,7 +197,8 @@ func TestNewWindowAggregateResultSet_Mean(t *testing.T) {
196197
if !resultSet.Next() {
197198
t.Fatalf("unexpected: resultSet could not advance")
198199
}
199-
cursor := resultSet.Cursor()
200+
cursor, err := resultSet.Cursor()
201+
require.NoError(t, err, "create cursor failed")
200202
if cursor == nil {
201203
t.Fatalf("unexpected: cursor was nil")
202204
}
@@ -238,7 +240,8 @@ func TestNewWindowAggregateResultSet_Months(t *testing.T) {
238240
if !resultSet.Next() {
239241
t.Fatalf("unexpected: resultSet could not advance")
240242
}
241-
cursor := resultSet.Cursor()
243+
cursor, err := resultSet.Cursor()
244+
require.NoError(t, err, "create cursor failed")
242245
if cursor == nil {
243246
t.Fatalf("unexpected: cursor was nil")
244247
}

storage/reads/array_cursor.gen.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func newLimitArrayCursor(cur cursors.Cursor) (cursors.Cursor, error) {
4545
default:
4646
return nil, &errors2.Error{
4747
Code: errors2.EInvalid,
48-
Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)),
48+
Msg: fmt.Sprintf("unsupported limit array cursor type: %s", arrayCursorType(cur)),
4949
}
5050
}
5151
}
@@ -74,7 +74,7 @@ func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) (curs
7474
default:
7575
return nil, &errors2.Error{
7676
Code: errors2.EInvalid,
77-
Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)),
77+
Msg: fmt.Sprintf("unsupported window first cursor type: %s", arrayCursorType(cur)),
7878
}
7979
}
8080
}
@@ -103,7 +103,7 @@ func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) (curso
103103
default:
104104
return nil, &errors2.Error{
105105
Code: errors2.EInvalid,
106-
Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)),
106+
Msg: fmt.Sprintf("unsupported window last cursor type: %s", arrayCursorType(cur)),
107107
}
108108
}
109109
}
@@ -129,7 +129,7 @@ func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) (curs
129129
default:
130130
return nil, &errors2.Error{
131131
Code: errors2.EInvalid,
132-
Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)),
132+
Msg: fmt.Sprintf("unsupported window count cursor type: %s", arrayCursorType(cur)),
133133
}
134134
}
135135
}

storage/reads/array_cursor.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package reads
33
import (
44
"context"
55
"fmt"
6+
67
errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors"
78

89
"github.com/influxdata/flux/interval"
@@ -104,7 +105,7 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool)
104105
return m
105106
}
106107

107-
func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
108+
func (m *multiShardArrayCursors) createCursor(row SeriesRow) (cursors.Cursor, error) {
108109
m.req.Name = row.Name
109110
m.req.Tags = row.SeriesTags
110111
m.req.Field = row.Field
@@ -123,26 +124,29 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
123124
}
124125

125126
if cur == nil || err != nil {
126-
return nil
127+
return nil, err
127128
}
128129

129130
switch c := cur.(type) {
130131
case cursors.IntegerArrayCursor:
131132
m.cursors.i.reset(c, row.Query, cond)
132-
return &m.cursors.i
133+
return &m.cursors.i, nil
133134
case cursors.FloatArrayCursor:
134135
m.cursors.f.reset(c, row.Query, cond)
135-
return &m.cursors.f
136+
return &m.cursors.f, nil
136137
case cursors.UnsignedArrayCursor:
137138
m.cursors.u.reset(c, row.Query, cond)
138-
return &m.cursors.u
139+
return &m.cursors.u, nil
139140
case cursors.StringArrayCursor:
140141
m.cursors.s.reset(c, row.Query, cond)
141-
return &m.cursors.s
142+
return &m.cursors.s, nil
142143
case cursors.BooleanArrayCursor:
143144
m.cursors.b.reset(c, row.Query, cond)
144-
return &m.cursors.b
145+
return &m.cursors.b, nil
145146
default:
146-
return nil
147+
return nil, &errors2.Error{
148+
Code: errors2.EInvalid,
149+
Msg: fmt.Sprintf("unsupported cursor type: %s", arrayCursorType(cur)),
150+
}
147151
}
148152
}

storage/reads/array_cursor_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -2227,7 +2227,9 @@ func TestMultiShardArrayCursor(t *testing.T) {
22272227
row := SeriesRow{Query: iter}
22282228
ctx := context.Background()
22292229
msac := newMultiShardArrayCursors(ctx, models.MinNanoTime, models.MaxNanoTime, true)
2230-
cur, ok := msac.createCursor(row).(cursors.IntegerArrayCursor)
2230+
cursor, err := msac.createCursor(row)
2231+
require.NoError(t, err, "create cursor failed")
2232+
cur, ok := cursor.(cursors.IntegerArrayCursor)
22312233
require.Truef(t, ok, "Expected IntegerArrayCursor")
22322234

22332235
ia := cur.Next()

storage/reads/group_resultset.go

+16-7
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ func (g *groupResultSet) Next() GroupCursor {
121121
// the time range of the query.
122122
func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool {
123123
// TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys.
124-
cur := g.arrayCursors.createCursor(*row)
124+
cur, err := g.arrayCursors.createCursor(*row)
125+
if err != nil {
126+
return false
127+
}
125128
var ts []int64
126129
switch c := cur.(type) {
127130
case cursors.IntegerArrayCursor:
@@ -302,15 +305,18 @@ func (c *groupNoneCursor) Next() bool {
302305
}
303306

304307
func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
305-
cur = c.arrayCursors.createCursor(c.row)
308+
cur, err = c.arrayCursors.createCursor(c.row)
309+
if err != nil {
310+
return nil, err
311+
}
306312
if c.agg != nil {
307313
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur)
308314
}
309315
return cur, err
310316
}
311317

312-
func (c *groupNoneCursor) Cursor() cursors.Cursor {
313-
return c.cursor
318+
func (c *groupNoneCursor) Cursor() (cursors.Cursor, error) {
319+
return c.cursor, nil
314320
}
315321

316322
type groupByCursor struct {
@@ -350,15 +356,18 @@ func (c *groupByCursor) Next() bool {
350356
}
351357

352358
func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
353-
cur = c.arrayCursors.createCursor(seriesRow)
359+
cur, err = c.arrayCursors.createCursor(seriesRow)
360+
if err != nil {
361+
return nil, err
362+
}
354363
if c.agg != nil {
355364
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur)
356365
}
357366
return cur, err
358367
}
359368

360-
func (c *groupByCursor) Cursor() cursors.Cursor {
361-
return c.cursor
369+
func (c *groupByCursor) Cursor() (cursors.Cursor, error) {
370+
return c.cursor, nil
362371
}
363372

364373
func (c *groupByCursor) Stats() cursors.CursorStats {

storage/reads/resultset.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
type multiShardCursors interface {
11-
createCursor(row SeriesRow) cursors.Cursor
11+
createCursor(row SeriesRow) (cursors.Cursor, error)
1212
}
1313

1414
type resultSet struct {
@@ -57,7 +57,7 @@ func (r *resultSet) Next() bool {
5757
return true
5858
}
5959

60-
func (r *resultSet) Cursor() cursors.Cursor {
60+
func (r *resultSet) Cursor() (cursors.Cursor, error) {
6161
return r.arrayCursors.createCursor(r.seriesRow)
6262
}
6363

storage/reads/resultset_lineprotocol.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ func ResultSetToLineProtocol(wr io.Writer, rs ResultSet) (err error) {
3232
line = append(line, ' ')
3333
line = append(line, field...)
3434
line = append(line, '=')
35-
err = cursorToLineProtocol(wr, line, rs.Cursor())
35+
cursor, err := rs.Cursor()
36+
if err != nil {
37+
return err
38+
}
39+
err = cursorToLineProtocol(wr, line, cursor)
3640
if err != nil {
3741
return err
3842
}

storage/reads/store.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type ResultSet interface {
1515
Next() bool
1616

1717
// Cursor returns the most recent cursor after a call to Next.
18-
Cursor() cursors.Cursor
18+
Cursor() (cursors.Cursor, error)
1919

2020
// Tags returns the tags for the most recent cursor after a call to Next.
2121
Tags() models.Tags
@@ -47,7 +47,7 @@ type GroupCursor interface {
4747
Next() bool
4848

4949
// Cursor returns the most recent cursor after a call to Next.
50-
Cursor() cursors.Cursor
50+
Cursor() (cursors.Cursor, error)
5151

5252
// Tags returns the tags for the most recent cursor after a call to Next.
5353
Tags() models.Tags

storage/reads/store_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func resultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) {
119119
for rs.Next() {
120120
fmt.Fprint(wr, "series: ")
121121
tagsToString(wr, rs.Tags())
122-
cur := rs.Cursor()
122+
cur, _ := rs.Cursor()
123123

124124
if po.SkipNilCursor && cur == nil {
125125
continue

0 commit comments

Comments
 (0)