-
Notifications
You must be signed in to change notification settings - Fork 144
Expand file tree
/
Copy pathcollection_n1ql_common.go
More file actions
745 lines (644 loc) · 26.9 KB
/
collection_n1ql_common.go
File metadata and controls
745 lines (644 loc) · 26.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
/*
Copyright 2021-Present Couchbase, Inc.
Use of this software is governed by the Business Source License included in
the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that
file, in accordance with the Business Source License, use of this software will
be governed by the Apache License, Version 2.0, included in the file
licenses/APL2.txt.
*/
package base
import (
"context"
"errors"
"fmt"
"slices"
"strconv"
"strings"
"time"
"github.com/couchbase/gocb/v2"
sgbucket "github.com/couchbase/sg-bucket"
pkgerrors "github.com/pkg/errors"
)
// gocb v1 and v2 have distinct declarations of consistency mode, but values are consistent
// for options used by SG (NotBounded, RequestPlus). Defining our own version here to avoid
// versioned gocb imports outside of bucket/collection implementations.
type ConsistencyMode int
const (
// NotBounded indicates no data consistency is required.
NotBounded = ConsistencyMode(1)
// RequestPlus indicates that request-level data consistency is required.
RequestPlus = ConsistencyMode(2)
)
type WaitForIndexesOnlineOption int8
const (
// WaitForIndexesDefault will wait a standard amount of time for indexes to come online
WaitForIndexesDefault WaitForIndexesOnlineOption = iota
// WaitForIndexesFailfast will fail immediately if the indexes are not online
WaitForIndexesFailfast
// WaitForIndexesInfinite will wait an indefinite amount of time for indexes to come online, or until the context is cancelled.
WaitForIndexesInfinite
)
// N1QLStore defines the set of operations Sync Gateway uses to manage and interact with N1QL
type N1QLStore interface {
GetName() string
BuildDeferredIndexes(ctx context.Context, indexSet []string) error
CreateIndex(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error
CreateIndexIfNotExists(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error
CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error
DropIndex(ctx context.Context, indexName string) error
ExplainQuery(ctx context.Context, statement string, params map[string]any) (plan map[string]any, err error)
GetIndexMeta(ctx context.Context, indexName string) (exists bool, meta *IndexMeta, err error)
Query(ctx context.Context, statement string, params map[string]any, consistency ConsistencyMode, adhoc bool) (results sgbucket.QueryResultIterator, err error)
IsErrNoResults(error) bool
EscapedKeyspace() string
IndexMetaBucketID() string
IndexMetaScopeID() string
IndexMetaKeyspaceID() string
BucketName() string
WaitForIndexesOnline(ctx context.Context, indexNames []string, option WaitForIndexesOnlineOption) error
// executeQuery performs the specified query without any built-in retry handling and returns the resultset
executeQuery(statement string) (sgbucket.QueryResultIterator, error)
// executeStatement executes the specified statement and closes the response, returning any errors received.
executeStatement(statement string) error
// getIndexes retrieves all index names, used by test harness
GetIndexes() (indexes []string, err error)
// waitUntilQueryServiceReady waits until the query service is ready to accept requests
waitUntilQueryServiceReady(timeout time.Duration) error
sgbucket.BucketStoreFeatureIsSupported
}
func ExplainQuery(ctx context.Context, store N1QLStore, statement string, params map[string]any) (plan map[string]any, err error) {
explainStatement := fmt.Sprintf("EXPLAIN %s", statement)
explainResults, explainErr := store.Query(ctx, explainStatement, params, RequestPlus, true)
if explainErr != nil {
return nil, explainErr
}
firstRow := explainResults.NextBytes(ctx)
err = explainResults.Close(ctx)
if err != nil {
return nil, err
}
unmarshalErr := JSONUnmarshal(firstRow, &plan)
return plan, unmarshalErr
}
type indexManager struct {
cluster *gocb.QueryIndexManager
collection *gocb.CollectionQueryIndexManager
bucketName string
scopeName string
collectionName string
useGOCBFastFailRetry bool
}
func (im *indexManager) GetAllIndexes() ([]gocb.QueryIndex, error) {
opts := &gocb.GetAllQueryIndexesOptions{
RetryStrategy: goCBRetryStrategy(im.useGOCBFastFailRetry),
}
if im.collection != nil {
return im.collection.GetAllIndexes(opts)
}
// ScopeName and CollectionName options are deprecated (and skipped for staticcheck) as of gocb v2.7.0
// (GOCBC-1391). When these run on more than a single collection (CBG-3026) this should be replaced with
// a N1QL query rather than a gocb call.
opts.ScopeName = im.scopeName // nolint:staticcheck
opts.CollectionName = im.collectionName // nolint:staticcheck
return im.cluster.GetAllIndexes(im.bucketName, opts)
}
// CreateIndex issues a CREATE INDEX query in the N1QLStore keyspace, using the form:
//
// CREATE INDEX indexName ON bucket.Name(expression) WHERE filterExpression WITH options
//
// Sample usage with resulting statement:
//
// CreateIndex("myIndex", "field1, field2, nested.field", "field1 > 0", N1qlIndexOptions{numReplica:1})
// CREATE INDEX myIndex on myBucket(field1, field2, nested.field) WHERE field1 > 0 WITH {"numReplica":1}
func CreateIndex(ctx context.Context, store N1QLStore, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
return createIndex(ctx, store, indexName, expression, filterExpression, false, options)
}
// CreateIndexIfNotExists issues a CREATE INDEX query in the N1QLStore keyspace, using the form:
//
// CREATE INDEX indexName ON bucket.Name(expression) IF NOT EXISTS WHERE filterExpression WITH options
//
// Sample usage with resulting statement:
//
// CreateIndex("myIndex", "field1, field2, nested.field", "field1 > 0", N1qlIndexOptions{numReplica:1})
// CREATE INDEX myIndex on myBucket(field1, field2, nested.field) WHERE field1 > 0 WITH {"numReplica":1}
func CreateIndexIfNotExists(ctx context.Context, store N1QLStore, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
return createIndex(ctx, store, indexName, expression, filterExpression, true, options)
}
// createIndex is a common function for CreateIndex and CreateIndexIfNotExists
func createIndex(ctx context.Context, store N1QLStore, indexName string, expression string, filterExpression string, ifNotExists bool, options *N1qlIndexOptions) error {
var ifNotExistsStr string
// Server 7.1+ - we can still safely _not_ use this when it's not available, because we have equivalent error handling inside this function to swallow `ErrAlreadyExists`.
// Would still prefer to use it when we can, to guard us against future error string changes, which is why we do both conditionally.
if ifNotExists && store.IsSupported(sgbucket.BucketStoreFeatureN1qlIfNotExistsDDL) {
ifNotExistsStr = " IF NOT EXISTS"
}
// Add filter expression, when present
var filterExpressionStr string
if filterExpression != "" {
filterExpressionStr = " WHERE " + filterExpression
}
var partitionExpresionStr string
if options.NumPartitions != nil && *options.NumPartitions > 1 {
partitionExpresionStr = " PARTITION BY HASH(META().id)"
}
createStatement := fmt.Sprintf("CREATE INDEX `%s`%s ON %s(%s)%s %s", indexName, ifNotExistsStr, store.EscapedKeyspace(), expression, partitionExpresionStr, filterExpressionStr)
// Replace any KeyspaceQueryToken references in the index expression
createStatement = strings.ReplaceAll(createStatement, KeyspaceQueryToken, store.EscapedKeyspace())
createErr := createIndexFromStatement(ctx, store, indexName, createStatement, options)
if IsIndexAlreadyExistsError(createErr) || IsCreateDuplicateIndexError(createErr) {
// Pre-7.1 compatibility: Swallow this error like Server does when specifying `IF NOT EXISTS`
if ifNotExists {
return nil
}
return ErrAlreadyExists
}
return createErr
}
func CreatePrimaryIndex(ctx context.Context, store N1QLStore, indexName string, options *N1qlIndexOptions) error {
createStatement := fmt.Sprintf("CREATE PRIMARY INDEX `%s` ON %s", indexName, store.EscapedKeyspace())
return createIndexFromStatement(ctx, store, indexName, createStatement, options)
}
// ErrIndexBackgroundRetry is returned when an index creation operation returned an error but just needs to wait for a server-side readiness or retry.
var ErrIndexBackgroundRetry = errors.New("Indexer error - waiting for server background retry")
func createIndexFromStatement(ctx context.Context, store N1QLStore, indexName string, createStatement string, options *N1qlIndexOptions) error {
if options != nil {
withClause, marshalErr := JSONMarshal(options)
if marshalErr != nil {
return marshalErr
}
createStatement = fmt.Sprintf(`%s with %s`, createStatement, withClause)
}
TracefCtx(ctx, KeyQuery, "Attempting to create index %q using statement: [%s]", indexName, UD(createStatement))
err := store.executeStatement(createStatement)
if err == nil {
return nil
}
if IsIndexerRetryIndexError(err) || IsCreateDuplicateIndexError(err) {
DebugfCtx(ctx, KeyQuery, "Index %q is already being created on server: %v", indexName, err)
return fmt.Errorf("%w: %s", ErrIndexBackgroundRetry, err.Error())
}
return pkgerrors.WithStack(RedactErrorf("Error creating index with statement: %s. Error: %v", UD(createStatement), err))
}
// Waits for index to exist/not exist. Used in response to background create/drop processing by server.
func waitForIndexExistence(ctx context.Context, store N1QLStore, indexName string, shouldExist bool) error {
worker := func() (shouldRetry bool, err error, value any) {
// GetIndexMeta has its own error retry handling,
// but keep the retry logic up here for checking if the index exists.
exists, _, err := store.GetIndexMeta(ctx, indexName)
if err != nil {
return false, err, nil
}
// If it's in the desired state, we're done
if exists == shouldExist {
return false, nil, nil
}
// Retry
return true, nil, nil
}
// Kick off retry loop
err, _ := RetryLoop(ctx, "waitForIndexExistence", worker, CreateMaxDoublingSleeperFunc(25, 100, 15000))
if err != nil {
return pkgerrors.Wrapf(err, "Error during waitForIndexExistence for index %s", indexName)
}
return nil
}
// BuildDeferredIndexes issues a build command for any deferred sync gateway indexes associated with the N1QLStore keyspace.
func BuildDeferredIndexes(ctx context.Context, s N1QLStore, indexSet []string) error {
if len(indexSet) == 0 {
return nil
}
InfofCtx(ctx, KeyQuery, "Building deferred indexes: %v", indexSet)
// the provided indexes can be in a state that is not yet ready to take a build command
// there's a delay between the time of index creation and when it's actually found in the system:indexes table
// this results in buildIndexes returning a not found error for an index that was very recently created
worker := func() (shouldRetry bool, err error, value any) {
err = buildIndexes(ctx, s, indexSet)
if IsIndexNotFoundError(err) {
DebugfCtx(ctx, KeyQuery, "Index not found error when building indexes - will retry: %v", err)
return true, err, nil
}
return err != nil, err, nil
}
// Initial retry 1 seconds, max wait 30s, waits up to 10m
sleeper := CreateMaxDoublingSleeperFunc(20, 1000, 30000)
err, _ := RetryLoop(ctx, "BuildDeferredIndexes", worker, sleeper)
if err != nil {
return err
}
return nil
}
// BuildIndexes executes a BUILD INDEX statement in the N1QLStore keyspace, using the form:
//
// BUILD INDEX ON `bucket.Name`(`index1`, `index2`, ...)
func buildIndexes(ctx context.Context, s N1QLStore, indexNames []string) error {
if len(indexNames) == 0 {
return nil
}
// Not using strings.Join because we want to escape each index name
indexNameList := StringSliceToN1QLArray(indexNames, "`")
buildStatement := fmt.Sprintf("BUILD INDEX ON %s(%s)", s.EscapedKeyspace(), indexNameList)
err := s.executeStatement(buildStatement)
if IsIndexerRetryBuildError(err) {
InfofCtx(ctx, KeyQuery, "Indexer returned error that will be automatically retried by the index service - waiting for that to complete. Error:%v", err)
return nil
}
return err
}
// IndexMeta represents a Couchbase GSI index.
type IndexMeta struct {
Name string `json:"name"` // name of the index
State string `json:"state"` // can be online, building, pending, online, offline, etc
}
type getIndexMetaRetryValues struct {
exists bool
meta IndexMeta
}
func GetIndexMeta(ctx context.Context, store N1QLStore, indexName string) (exists bool, meta *IndexMeta, err error) {
worker := func() (shouldRetry bool, err error, value *getIndexMetaRetryValues) {
metas, err := GetIndexesMeta(ctx, store, []string{indexName})
if err != nil {
// retry
WarnfCtx(ctx, "Error from GetIndexMeta for index %s: %v will retry", indexName, err)
return true, err, nil
}
meta, exists := metas[indexName]
return false, nil, &getIndexMetaRetryValues{
exists: exists,
meta: meta,
}
}
// Kick off retry loop
err, val := RetryLoop(ctx, "GetIndexMeta", worker, CreateMaxDoublingSleeperFunc(25, 100, 15000))
if err != nil {
return false, nil, pkgerrors.Wrapf(err, "Error during GetIndexMeta for index %s", indexName)
}
return val.exists, &val.meta, nil
}
// GetIndexesMeta returns the status of a given set of indexes as a map. If an index is not present, the value will be omitted from the map.
func GetIndexesMeta(ctx context.Context, store N1QLStore, indexNames []string) (map[string]IndexMeta, error) {
if indexNames == nil {
return nil, fmt.Errorf("Must specify index names")
}
whereIndexes := make([]string, 0, len(indexNames))
for _, indexName := range indexNames {
whereIndexes = append(whereIndexes, strconv.Quote(indexName))
}
statement := fmt.Sprintf("SELECT name,state FROM system:indexes WHERE indexes.name IN [%s] AND indexes.keyspace_id = '%s'", strings.Join(whereIndexes, ","), store.IndexMetaKeyspaceID())
if store.IndexMetaBucketID() != "" {
statement += fmt.Sprintf(" AND indexes.bucket_id = '%s'", store.IndexMetaBucketID())
}
if store.IndexMetaScopeID() != "" {
statement += fmt.Sprintf(" AND indexes.scope_id = '%s'", store.IndexMetaScopeID())
}
results, err := store.executeQuery(statement)
if store.IsErrNoResults(err) {
return nil, nil
} else if err != nil {
return nil, err
}
defer func() {
err := results.Close(ctx)
if err != nil {
WarnfCtx(ctx, "Error closing results from GetIndexesMeta: %v", err)
}
}()
indexes := make(map[string]IndexMeta)
var meta IndexMeta
for results.Next(ctx, &meta) {
indexes[meta.Name] = meta
}
return indexes, nil
}
// DropIndex drops the specified index from the N1QLStore keyspace.
func DropIndex(ctx context.Context, store N1QLStore, indexName string) error {
statement := fmt.Sprintf("DROP INDEX default:%s.`%s`", store.EscapedKeyspace(), indexName)
err := store.executeStatement(statement)
if err != nil && !IsIndexerRetryIndexError(err) {
return err
}
if IsIndexerRetryIndexError(err) {
InfofCtx(ctx, KeyQuery, "Indexer error dropping index - waiting for server background retry. Error:%v", err)
// Wait for bucket to be dropped in background before returning
return waitForIndexExistence(ctx, store, indexName, false)
}
return err
}
// AsN1QLStore tries to return the given DataStore as a N1QLStore.
func AsN1QLStore(dataStore DataStore) (N1QLStore, bool) {
switch typedDataStore := dataStore.(type) {
case *Collection:
return typedDataStore, true
case *LeakyDataStore:
return typedDataStore, true
default:
// bail out for unrecognised/unsupported data store types
return nil, false
}
}
// Index not found errors (returned by DropIndex) don't have a specific N1QL error code - they are of the form:
//
// [5000] GSI index testIndex_not_found not found.
//
// Stuck with doing a string compare to differentiate between 'not found' and other errors
func IsIndexNotFoundError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "not found")
}
// 'IsIndexerRetry' type errors are of the form:
// error:[5000] GSI CreateIndex() - cause: Encountered transient error. Index creation will be retried in background. Error: Index testIndex_value will retry building in the background for reason: Bucket test_data_bucket In Recovery.
// error:[5000] GSI Drop() - cause: Fail to drop index on some indexer nodes. Error=Encountered error when dropping index: Indexer In Recovery. Drop index will be retried in background.
// error:[5000] BuildIndexes - cause: Build index fails. %vIndex testIndexDeferred will retry building in the background for reason: Build Already In Progress. Bucket test_data_bucket.
//
// https://issues.couchbase.com/browse/MB-19358 is filed to request improved indexer error codes for these scenarios (and others)
func IsIndexerRetryIndexError(err error) bool {
if err == nil {
return false
}
retryableStrings := []string{
"will retry",
"will be retried",
// [{\"code\":5000,\"message\":\"GSI Drop() - cause: Fail to Drop Index due to internal errors. Cleanup may happen in the background. Error=DeleteScheduleCreateToken:*:Rev mismatch.\"}]
"Drop Index due to internal errors. Cleanup may happen in the background",
}
for _, s := range retryableStrings {
if strings.Contains(err.Error(), s) {
return true
}
}
return false
}
func IsCreateDuplicateIndexError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "duplicate index name")
}
func IsIndexerRetryBuildError(err error) bool {
if err == nil {
return false
}
if strings.Contains(err.Error(), "will retry") || strings.Contains(err.Error(), "will be retried") {
return true
}
return false
}
func IsIndexAlreadyExistsError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "already exists")
}
// Check for transient indexer errors (can be retried)
func isTransientIndexerError(err error) bool {
if err == nil {
return false
} else if strings.Contains(err.Error(), "Indexer rollback") {
return true
} else if IsIndexerRetryBuildError(err) {
return true
}
return false
}
func SlowQueryLog(ctx context.Context, startTime time.Time, threshold time.Duration, messageFormat string, args ...any) {
if elapsed := time.Now().Sub(startTime); elapsed > threshold {
InfofCtx(ctx, KeyQuery, messageFormat+" took "+elapsed.String(), args...)
}
}
// Converts to a format like `value1`,`value2` when quote=`
func StringSliceToN1QLArray(values []string, quote string) string {
if len(values) == 0 {
return ""
}
asString := fmt.Sprintf("%s%s%s", quote, values[0], quote)
for i := 1; i < len(values); i++ {
asString = fmt.Sprintf("%s,%s%s%s", asString, quote, values[i], quote)
}
return asString
}
// gocbResultRaw wraps a raw gocb result (both view and n1ql) to implement
// the sgbucket.QueryResultIterator interface
type gocbResultRaw interface {
// NextBytes returns the next row as bytes.
NextBytes() []byte
// Err returns any errors that have occurred on the stream
Err() error
// Close marks the results as closed, returning any errors that occurred during reading the results.
Close() error
// MetaData returns any meta-data that was available from this query as bytes.
MetaData() ([]byte, error)
}
// GoCBQueryIterator wraps a gocb v2 ViewResultRaw to implement sgbucket.QueryResultIterator
type gocbRawIterator struct {
rawResult gocbResultRaw
concurrentQueryOpLimitChan chan struct{}
}
// Unmarshal a single result row into valuePtr, and then close the iterator
func (i *gocbRawIterator) One(ctx context.Context, valuePtr any) error {
if !i.Next(ctx, valuePtr) {
err := i.Close(ctx)
if err != nil {
return nil
}
return gocb.ErrNoResult
}
// Ignore any errors occurring after we already have our result
// - follows approach used by gocb v1 One() implementation
_ = i.Close(ctx)
return nil
}
// Unmarshal the next result row into valuePtr. Returns false when reaching end of result set
func (i *gocbRawIterator) Next(ctx context.Context, valuePtr any) bool {
nextBytes := i.rawResult.NextBytes()
if nextBytes == nil {
return false
}
err := JSONUnmarshal(nextBytes, &valuePtr)
if err != nil {
WarnfCtx(ctx, "Unable to marshal view result row into value: %v", err)
return false
}
return true
}
// Retrieve raw bytes for the next result row
func (i *gocbRawIterator) NextBytes(_ context.Context) []byte {
return i.rawResult.NextBytes()
}
// Closes the iterator. Returns any row-level errors seen during iteration.
func (i *gocbRawIterator) Close(_ context.Context) error {
// Have to iterate over any remaining results to clear the reader
// Otherwise we get "the result must be closed before accessing the meta-data" on close details on CBG-1666
for i.rawResult.NextBytes() != nil {
// noop to drain results
}
defer func() {
if i.concurrentQueryOpLimitChan != nil {
<-i.concurrentQueryOpLimitChan
}
}()
// check for errors before closing?
closeErr := i.rawResult.Close()
if closeErr != nil {
return closeErr
}
resultErr := i.rawResult.Err()
return resultErr
}
func IndexMetaKeyspaceID(bucketName, scopeName, collectionName string) string {
if IsDefaultCollection(scopeName, collectionName) {
return bucketName
}
return collectionName
}
// WaitForIndexesOnline takes set of indexes and watches them till they're online.
func WaitForIndexesOnline(ctx context.Context, keyspace string, mgr *indexManager, indexNames []string, waitOption WaitForIndexesOnlineOption) error {
var retrySleeper RetrySleeper
initialWaitTime := 100
maxSleepTime := 5000
switch waitOption {
case WaitForIndexesDefault:
retrySleeper = CreateMaxDoublingSleeperFunc(180, initialWaitTime, maxSleepTime)
case WaitForIndexesFailfast:
retrySleeper = CreateFastFailRetrySleeperFunc()
case WaitForIndexesInfinite:
retrySleeper = CreateIndefiniteMaxDoublingSleeperFunc(initialWaitTime, maxSleepTime)
default:
return fmt.Errorf("Invalid WaitForIndexesOnlineOption: %d", waitOption)
}
onlineIndexes := make(map[string]bool)
err, _ := RetryLoop(ctx, "WaitForIndexesOnline", func() (shouldRetry bool, err error, _ any) {
watchedOnlineIndexCount := 0
currIndexes, err := mgr.GetAllIndexes()
if err != nil {
return false, err, nil
}
// check each of the current indexes state, add to map once finished to make sure each index online is only being logged once
for i := range currIndexes {
name := currIndexes[i].Name
// use slices.Contains since the number of indexes is expected to be small
if currIndexes[i].State == IndexStateOnline && slices.Contains(indexNames, name) {
if !onlineIndexes[name] {
InfofCtx(ctx, KeyAll, "Index %s is online", MD(name))
onlineIndexes[name] = true
}
}
}
// check online index against indexes we watch to have online, increase counter as each comes online
var offlineIndexes []string
for _, listVal := range indexNames {
if onlineIndexes[listVal] {
watchedOnlineIndexCount++
} else {
offlineIndexes = append(offlineIndexes, listVal)
}
}
if watchedOnlineIndexCount == len(indexNames) {
return false, nil, nil
}
DebugfCtx(ctx, KeyAll, "Indexes %s not ready - retrying...", strings.Join(offlineIndexes, ", "))
return true, nil, nil
}, retrySleeper)
return err
}
// GetSystemCollectionIndexesMeta queries system:all_indexes (which, unlike system:indexes, includes
// indexes on system-scope collections such as _system._mobile) and returns state metadata for the
// named indexes. bucketName, scopeName, and collectionName identify the target keyspace.
func GetSystemCollectionIndexesMeta(ctx context.Context, store N1QLStore, scopeName, collectionName string, indexNames []string) (map[string]IndexMeta, error) {
if len(indexNames) == 0 {
return nil, fmt.Errorf("must specify at least one index name")
}
quotedNames := make([]string, 0, len(indexNames))
for _, name := range indexNames {
quotedNames = append(quotedNames, strconv.Quote(name))
}
// bucket_id, scope_id, and keyspace_id are trusted internal values originating from the
// CBS cluster topology (not user input), so direct interpolation is safe here.
// Index names are escaped via strconv.Quote above.
statement := fmt.Sprintf(
"SELECT name, state FROM system:all_indexes WHERE bucket_id = '%s' AND scope_id = '%s' AND keyspace_id = '%s' AND name IN [%s]",
store.BucketName(),
scopeName,
collectionName,
strings.Join(quotedNames, ","),
)
results, err := store.executeQuery(statement)
if store.IsErrNoResults(err) {
return nil, nil
} else if err != nil {
return nil, err
}
defer func() {
if closeErr := results.Close(ctx); closeErr != nil {
WarnfCtx(ctx, "Error closing results from GetSystemCollectionIndexesMeta: %v", closeErr)
}
}()
indexes := make(map[string]IndexMeta)
var meta IndexMeta
for results.Next(ctx, &meta) {
indexes[meta.Name] = meta
}
return indexes, nil
}
// WaitForSystemCollectionIndexesOnline waits for the named indexes to come online on a
// system-scope collection (e.g. _system._mobile). CBS omits system-scope indexes from
// system:indexes, so this function queries system:all_indexes instead. store is used only
// to run the query; scopeName and collectionName identify the target keyspace within the
// store's bucket.
func WaitForSystemCollectionIndexesOnline(ctx context.Context, store N1QLStore, scopeName, collectionName string, indexNames []string, waitOption WaitForIndexesOnlineOption) error {
var retrySleeper RetrySleeper
initialWaitTime := 100
maxSleepTime := 5000
switch waitOption {
case WaitForIndexesDefault:
retrySleeper = CreateMaxDoublingSleeperFunc(180, initialWaitTime, maxSleepTime)
case WaitForIndexesFailfast:
retrySleeper = CreateFastFailRetrySleeperFunc()
case WaitForIndexesInfinite:
retrySleeper = CreateIndefiniteMaxDoublingSleeperFunc(initialWaitTime, maxSleepTime)
default:
return fmt.Errorf("invalid WaitForIndexesOnlineOption: %d", waitOption)
}
onlineIndexes := make(map[string]bool)
keyspace := strings.Join([]string{store.BucketName(), scopeName, collectionName}, ".")
err, _ := RetryLoop(ctx, "WaitForSystemCollectionIndexesOnline", func() (shouldRetry bool, err error, _ any) {
watchedOnlineIndexCount := 0
currIndexes, err := GetSystemCollectionIndexesMeta(ctx, store, scopeName, collectionName, indexNames)
if err != nil {
return false, err, nil
}
for _, idx := range currIndexes {
if idx.State == IndexStateOnline {
if !onlineIndexes[idx.Name] {
InfofCtx(ctx, KeyAll, "Index %s on %s is online", MD(idx.Name), MD(keyspace))
onlineIndexes[idx.Name] = true
}
}
}
var offlineIndexes []string
for _, name := range indexNames {
if onlineIndexes[name] {
watchedOnlineIndexCount++
} else {
offlineIndexes = append(offlineIndexes, name)
}
}
if watchedOnlineIndexCount == len(indexNames) {
return false, nil, nil
}
DebugfCtx(ctx, KeyAll, "Indexes %s on %s not ready - retrying...", strings.Join(offlineIndexes, ", "), MD(keyspace))
return true, nil, nil
}, retrySleeper)
return err
}
func GetAllIndexes(mgr *indexManager) (indexes []string, err error) {
indexes = []string{}
indexInfo, err := mgr.GetAllIndexes()
if err != nil {
return indexes, err
}
for _, indexInfo := range indexInfo {
indexes = append(indexes, indexInfo.Name)
}
return indexes, nil
}