Skip to content

Commit 24c6526

Browse files
committed
fix(telegraf): support pagination parameters when listing
1 parent 5a3419c commit 24c6526

File tree

14 files changed

+166
-87
lines changed

14 files changed

+166
-87
lines changed

cmd/influx/telegraf.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ func (b *cmdTelegrafBuilder) listRunE(cmd *cobra.Command, args []string) error {
9696
return b.writeTelegrafConfig(cfg)
9797
}
9898

99-
cfgs, _, err := svc.FindTelegrafConfigs(context.Background(), influxdb.TelegrafConfigFilter{
100-
OrgID: &orgID,
101-
UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ResourceType: influxdb.TelegrafsResourceType},
102-
})
99+
cfgs, _, err := svc.FindTelegrafConfigs(context.Background(),
100+
influxdb.TelegrafConfigFilter{
101+
OrgID: &orgID,
102+
})
103103
if err != nil {
104104
return err
105105
}

dbrp/service.go

+25-23
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"bytes"
2929
"context"
3030
"encoding/json"
31-
"fmt"
3231

3332
"github.com/influxdata/influxdb/v2"
3433
"github.com/influxdata/influxdb/v2/kv"
@@ -160,38 +159,39 @@ func (s *Service) unsetDefault(tx kv.Tx, compKey []byte) error {
160159
// getFirstBut returns the first element in the db/rp index (not accounting for the `skipID`).
161160
// If the length of the returned ID is 0, it means no element was found.
162161
// The skip value is useful, for instance, if one wants to delete an element based on the result of this operation.
163-
func (s *Service) getFirstBut(tx kv.Tx, compKey []byte, skipID []byte) ([]byte, error) {
164-
stop := fmt.Errorf("stop")
165-
var next []byte
166-
if err := s.byOrgAndDatabase.Walk(context.Background(), tx, compKey, func(k, v []byte) error {
162+
func (s *Service) getFirstBut(tx kv.Tx, compKey []byte, skipID []byte) (next []byte, err error) {
163+
err = s.byOrgAndDatabase.Walk(context.Background(), tx, compKey, func(k, v []byte) (bool, error) {
167164
if bytes.Equal(skipID, k) {
168-
return nil
165+
return true, nil
169166
}
167+
170168
next = k
171-
return stop
172-
}); err != nil && err != stop {
173-
return nil, ErrInternalService(err)
174-
}
175-
return next, nil
169+
170+
return false, nil
171+
})
172+
return
176173
}
177174

178175
// isDBRPUnique verifies if the triple orgID-database-retention-policy is unique.
179176
func (s *Service) isDBRPUnique(ctx context.Context, m influxdb.DBRPMappingV2) error {
180177
return s.store.View(ctx, func(tx kv.Tx) error {
181-
return s.byOrgAndDatabase.Walk(ctx, tx, composeForeignKey(m.OrganizationID, m.Database), func(k, v []byte) error {
178+
return s.byOrgAndDatabase.Walk(ctx, tx, composeForeignKey(m.OrganizationID, m.Database), func(k, v []byte) (bool, error) {
182179
dbrp := &influxdb.DBRPMappingV2{}
183180
if err := json.Unmarshal(v, dbrp); err != nil {
184-
return ErrInternalService(err)
181+
return false, ErrInternalService(err)
185182
}
183+
186184
if dbrp.ID == m.ID {
187185
// Corner case.
188186
// This is the very same DBRP, just skip it!
189-
return nil
187+
return true, nil
190188
}
189+
191190
if dbrp.RetentionPolicy == m.RetentionPolicy {
192-
return ErrDBRPAlreadyExists("another DBRP mapping with same orgID, db, and rp exists")
191+
return false, ErrDBRPAlreadyExists("another DBRP mapping with same orgID, db, and rp exists")
193192
}
194-
return nil
193+
194+
return true, nil
195195
})
196196
})
197197
}
@@ -254,22 +254,23 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
254254
}
255255

256256
ms := []*influxdb.DBRPMappingV2{}
257-
add := func(tx kv.Tx) func(k, v []byte) error {
258-
return func(k, v []byte) error {
257+
add := func(tx kv.Tx) func(k, v []byte) (bool, error) {
258+
return func(k, v []byte) (bool, error) {
259259
m := influxdb.DBRPMappingV2{}
260260
if err := json.Unmarshal(v, &m); err != nil {
261-
return ErrInternalService(err)
261+
return false, ErrInternalService(err)
262262
}
263263
// Updating the Default field must be done before filtering.
264264
defID, err := get(tx, m.OrganizationID, m.Database)
265265
if err != nil {
266-
return ErrInternalService(err)
266+
return false, ErrInternalService(err)
267267
}
268+
268269
m.Default = m.ID == *defID
269270
if filterFunc(&m, filter) {
270271
ms = append(ms, &m)
271272
}
272-
return nil
273+
return true, nil
273274
}
274275
}
275276

@@ -303,7 +304,8 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
303304
if err != nil {
304305
return ErrInternalService(err)
305306
}
306-
return add(tx)(defID, v)
307+
_, err = add(tx)(defID, v)
308+
return err
307309
}
308310
}
309311
return s.byOrgAndDatabase.Walk(ctx, tx, compKey, add(tx))
@@ -318,7 +320,7 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
318320
}
319321

320322
for k, v := cur.First(); k != nil; k, v = cur.Next() {
321-
if err := add(tx)(k, v); err != nil {
323+
if _, err := add(tx)(k, v); err != nil {
322324
return err
323325
}
324326
}

http/telegraf.go

+3-12
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,8 @@ func (h *TelegrafHandler) handleGetTelegraf(w http.ResponseWriter, r *http.Reque
278278

279279
func decodeTelegrafConfigFilter(ctx context.Context, r *http.Request) (*influxdb.TelegrafConfigFilter, error) {
280280
f := &influxdb.TelegrafConfigFilter{}
281-
urm, err := decodeUserResourceMappingFilter(ctx, r, influxdb.TelegrafsResourceType)
282-
if err == nil {
283-
f.UserResourceMappingFilter = *urm
284-
}
285-
286281
q := r.URL.Query()
282+
287283
if orgIDStr := q.Get("orgID"); orgIDStr != "" {
288284
orgID, err := influxdb.IDFromString(orgIDStr)
289285
if err != nil {
@@ -297,7 +293,8 @@ func decodeTelegrafConfigFilter(ctx context.Context, r *http.Request) (*influxdb
297293
} else if orgNameStr := q.Get("org"); orgNameStr != "" {
298294
f.Organization = &orgNameStr
299295
}
300-
return f, err
296+
297+
return f, nil
301298
}
302299

303300
// handlePostTelegraf is the HTTP handler for the POST /api/v2/telegrafs route.
@@ -445,12 +442,6 @@ func (s *TelegrafService) FindTelegrafConfigs(ctx context.Context, f influxdb.Te
445442
if f.Organization != nil {
446443
params = append(params, [2]string{"organization", *f.Organization})
447444
}
448-
if f.ResourceID != 0 {
449-
params = append(params, [2]string{"resourceID", f.ResourceID.String()})
450-
}
451-
if f.UserID != 0 {
452-
params = append(params, [2]string{"userID", f.UserID.String()})
453-
}
454445

455446
var resp struct {
456447
Configs []*influxdb.TelegrafConfig `json:"configurations"`

kv/index.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Buck
222222

223223
for i, value := range values {
224224
if value != nil {
225-
if err := visit(keys[i], value); err != nil {
225+
if cont, err := visit(keys[i], value); !cont || err != nil {
226226
return err
227227
}
228228
}
@@ -390,9 +390,9 @@ func consumeBucket(ctx context.Context, store Store, fn func(tx Tx) (Bucket, err
390390
return err
391391
}
392392

393-
return WalkCursor(ctx, cursor, func(k, v []byte) error {
393+
return WalkCursor(ctx, cursor, func(k, v []byte) (bool, error) {
394394
kvs = append(kvs, [2][]byte{k, v})
395-
return nil
395+
return true, nil
396396
})
397397
})
398398
}

kv/migration/migration.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -253,24 +253,24 @@ func (m *Migrator) walk(ctx context.Context, store kv.Store, fn func(id influxdb
253253
return err
254254
}
255255

256-
return kv.WalkCursor(ctx, cursor, func(k, v []byte) error {
256+
return kv.WalkCursor(ctx, cursor, func(k, v []byte) (bool, error) {
257257
var id influxdb.ID
258258
if err := id.Decode(k); err != nil {
259-
return fmt.Errorf("decoding migration id: %w", err)
259+
return false, fmt.Errorf("decoding migration id: %w", err)
260260
}
261261

262262
var migration Migration
263263
if err := json.Unmarshal(v, &migration); err != nil {
264-
return err
264+
return false, err
265265
}
266266

267267
idx := int(id) - 1
268268
if idx >= len(m.Specs) {
269-
return fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound)
269+
return false, fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound)
270270
}
271271

272272
if spec := m.Specs[idx]; spec.MigrationName() != migration.Name {
273-
return fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name)
273+
return false, fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name)
274274
}
275275

276276
if migration.FinishedAt != nil {
@@ -279,7 +279,7 @@ func (m *Migrator) walk(ctx context.Context, store kv.Store, fn func(id influxdb
279279

280280
fn(id, migration)
281281

282-
return nil
282+
return true, nil
283283
})
284284
}); err != nil {
285285
return fmt.Errorf("reading migrations: %w", err)

kv/store.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func WithCursorLimit(limit int) CursorOption {
234234

235235
// VisitFunc is called for each k, v byte slice pair from the underlying source bucket
236236
// which are found in the index bucket for a provided foreign key.
237-
type VisitFunc func(k, v []byte) error
237+
type VisitFunc func(k, v []byte) (bool, error)
238238

239239
// WalkCursor consumers the forward cursor call visit for each k/v pair found
240240
func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error) {
@@ -245,7 +245,7 @@ func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err
245245
}()
246246

247247
for k, v := cursor.Next(); k != nil; k, v = cursor.Next() {
248-
if err := visit(k, v); err != nil {
248+
if cont, err := visit(k, v); !cont || err != nil {
249249
return err
250250
}
251251

kv/urm.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,17 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in
136136
if filter.UserID.Valid() {
137137
// urm by user index lookup
138138
userID, _ := filter.UserID.Encode()
139-
if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error {
139+
if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) (bool, error) {
140140
m := &influxdb.UserResourceMapping{}
141141
if err := json.Unmarshal(v, m); err != nil {
142-
return CorruptURMError(err)
142+
return false, CorruptURMError(err)
143143
}
144144

145145
if filterFn(m) {
146146
ms = append(ms, m)
147147
}
148148

149-
return nil
149+
return true, nil
150150
}); err != nil {
151151
return nil, err
152152
}

paging.go

+14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,20 @@ type FindOptions struct {
3434
Descending bool
3535
}
3636

37+
// GetLimit returns the resolved limit between then limit boundaries.
38+
// Given a limit <= 0 it returns the default limit.
39+
func (f *FindOptions) GetLimit() int {
40+
if f == nil || f.Limit <= 0 {
41+
return DefaultPageSize
42+
}
43+
44+
if f.Limit > MaxPageSize {
45+
return MaxPageSize
46+
}
47+
48+
return f.Limit
49+
}
50+
3751
// DecodeFindOptions returns a FindOptions decoded from http request.
3852
func DecodeFindOptions(r *http.Request) (*FindOptions, error) {
3953
opts := &FindOptions{}

secret/storage.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,20 @@ func (s *Storage) ListSecret(ctx context.Context, tx kv.Tx, orgID influxdb.ID) (
8181

8282
keys := []string{}
8383

84-
err = kv.WalkCursor(ctx, cur, func(k, v []byte) error {
84+
err = kv.WalkCursor(ctx, cur, func(k, v []byte) (bool, error) {
8585
id, key, err := decodeSecretKey(k)
8686
if err != nil {
87-
return err
87+
return false, err
8888
}
8989

9090
if id != orgID {
9191
// We've reached the end of the keyspace for the provided orgID
92-
return nil
92+
return false, nil
9393
}
9494

9595
keys = append(keys, key)
96-
return nil
96+
97+
return true, nil
9798
})
9899
if err != nil {
99100
return nil, err

telegraf.go

-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ type TelegrafConfigStore interface {
5454
type TelegrafConfigFilter struct {
5555
OrgID *ID
5656
Organization *string
57-
UserResourceMappingFilter
5857
}
5958

6059
// TelegrafConfig stores telegraf config for one telegraf instance.

telegraf/service/telegraf.go

+29-8
Original file line numberDiff line numberDiff line change
@@ -155,25 +155,40 @@ func (s *Service) findTelegrafConfigByID(ctx context.Context, tx kv.Tx, id influ
155155
// Additional options provide pagination & sorting.
156156
func (s *Service) FindTelegrafConfigs(ctx context.Context, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) (tcs []*influxdb.TelegrafConfig, n int, err error) {
157157
err = s.kv.View(ctx, func(tx kv.Tx) error {
158-
tcs, n, err = s.findTelegrafConfigs(ctx, tx, filter)
158+
tcs, n, err = s.findTelegrafConfigs(ctx, tx, filter, opt...)
159159
return err
160160
})
161161
return tcs, n, err
162162
}
163163

164164
func (s *Service) findTelegrafConfigs(ctx context.Context, tx kv.Tx, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) ([]*influxdb.TelegrafConfig, int, error) {
165-
tcs := make([]*influxdb.TelegrafConfig, 0)
165+
var (
166+
limit = influxdb.DefaultPageSize
167+
offset int
168+
count int
169+
tcs = make([]*influxdb.TelegrafConfig, 0)
170+
)
171+
172+
if len(opt) > 0 {
173+
limit = opt[0].GetLimit()
174+
offset = opt[0].Offset
175+
}
166176

167-
visit := func(k, v []byte) error {
177+
visit := func(k, v []byte) (bool, error) {
168178
var tc influxdb.TelegrafConfig
169179
if err := json.Unmarshal(v, &tc); err != nil {
170-
return err
180+
return false, err
171181
}
172182

173-
tcs = append(tcs, &tc)
183+
// skip until offset reached
184+
if count >= offset {
185+
tcs = append(tcs, &tc)
186+
}
174187

175-
return nil
188+
count++
176189

190+
// stop cursing when limit is reached
191+
return len(tcs) < limit, nil
177192
}
178193

179194
if filter.OrgID == nil {
@@ -183,8 +198,14 @@ func (s *Service) findTelegrafConfigs(ctx context.Context, tx kv.Tx, filter infl
183198
return nil, 0, err
184199
}
185200

186-
// TODO(georgemac): convert find options into cursor options
187-
cursor, err := bucket.ForwardCursor(nil)
201+
// cursors do not support numeric offset
202+
// but we can at least constrain the response
203+
// size by the offset + limit since we are
204+
// not doing any other filtering
205+
// REMOVE this cursor option if you do any
206+
// other filtering
207+
208+
cursor, err := bucket.ForwardCursor(nil, kv.WithCursorLimit(offset+limit))
188209
if err != nil {
189210
return nil, 0, err
190211
}

0 commit comments

Comments
 (0)