Skip to content

Commit 7e6859c

Browse files
authored
Merge pull request #2323 from josephschorr/watch-disable
Add option to disable watch
2 parents 7919b40 + a4c3f46 commit 7e6859c

File tree

13 files changed

+135
-36
lines changed

13 files changed

+135
-36
lines changed

internal/datastore/crdb/crdb.go

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
187187
filterMaximumIDCount: config.filterMaximumIDCount,
188188
supportsIntegrity: config.withIntegrity,
189189
gcWindow: config.gcWindow,
190+
expirationEnabled: !config.expirationDisabled,
191+
watchEnabled: !config.watchDisabled,
190192
schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, config.expirationDisabled),
191193
}
192194
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)
@@ -284,6 +286,8 @@ type crdbDatastore struct {
284286
cancel context.CancelFunc
285287
filterMaximumIDCount uint16
286288
supportsIntegrity bool
289+
expirationEnabled bool
290+
watchEnabled bool
287291
}
288292

289293
func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
@@ -324,25 +328,30 @@ func (cds *crdbDatastore) ReadWriteTx(
324328
Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(querier, cds),
325329
}
326330

331+
// Metadata flag is required if expiration and watch are both enabled,
332+
// to allow expired deletions to be filtered from the watch stream.
333+
requiresMetadataFlag := cds.expirationEnabled && cds.watchEnabled
334+
327335
// Write metadata onto the transaction.
328336
metadata := config.Metadata.AsMap()
337+
if len(metadata) > 0 || requiresMetadataFlag {
338+
// Mark the transaction as coming from SpiceDB. See the comment in watch.go
339+
// for why this is necessary.
340+
metadata[spicedbTransactionKey] = true
341+
342+
expiresAt := time.Now().Add(cds.gcWindow).Add(1 * time.Minute)
343+
insertTransactionMetadata := psql.Insert(schema.TableTransactionMetadata).
344+
Columns(schema.ColExpiresAt, schema.ColMetadata).
345+
Values(expiresAt, metadata)
346+
347+
sql, args, err := insertTransactionMetadata.ToSql()
348+
if err != nil {
349+
return fmt.Errorf("error building metadata insert: %w", err)
350+
}
329351

330-
// Mark the transaction as coming from SpiceDB. See the comment in watch.go
331-
// for why this is necessary.
332-
metadata[spicedbTransactionKey] = true
333-
334-
expiresAt := time.Now().Add(cds.gcWindow).Add(1 * time.Minute)
335-
insertTransactionMetadata := psql.Insert(schema.TableTransactionMetadata).
336-
Columns(schema.ColExpiresAt, schema.ColMetadata).
337-
Values(expiresAt, metadata)
338-
339-
sql, args, err := insertTransactionMetadata.ToSql()
340-
if err != nil {
341-
return fmt.Errorf("error building metadata insert: %w", err)
342-
}
343-
344-
if _, err := tx.Exec(ctx, sql, args...); err != nil {
345-
return fmt.Errorf("error writing metadata: %w", err)
352+
if _, err := tx.Exec(ctx, sql, args...); err != nil {
353+
return fmt.Errorf("error writing metadata: %w", err)
354+
}
346355
}
347356

348357
reader := &crdbReader{
@@ -520,26 +529,30 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
520529
return nil, err
521530
}
522531

523-
// Start a changefeed with an invalid value. If we get back an invalid value error (SQLSTATE 22023)
524-
// then we know that the datastore supports watch. If we get back any other error, then we know that
525-
// the datastore does not support watch emits or there is a permissions issue.
526-
_ = cds.writePool.ExecFunc(ctx, func(ctx context.Context, tag pgconn.CommandTag, err error) error {
527-
if err == nil {
528-
return spiceerrors.MustBugf("expected an error, but got none")
529-
}
532+
if cds.watchEnabled {
533+
// Start a changefeed with an invalid value. If we get back an invalid value error (SQLSTATE 22023)
534+
// then we know that the datastore supports watch. If we get back any other error, then we know that
535+
// the datastore does not support watch emits or there is a permissions issue.
536+
_ = cds.writePool.ExecFunc(ctx, func(ctx context.Context, tag pgconn.CommandTag, err error) error {
537+
if err == nil {
538+
return spiceerrors.MustBugf("expected an error, but got none")
539+
}
530540

531-
var pgerr *pgconn.PgError
532-
if errors.As(err, &pgerr) {
533-
if pgerr.Code == "22023" {
534-
features.Watch.Status = datastore.FeatureSupported
535-
return nil
541+
var pgerr *pgconn.PgError
542+
if errors.As(err, &pgerr) {
543+
if pgerr.Code == "22023" {
544+
features.Watch.Status = datastore.FeatureSupported
545+
return nil
546+
}
536547
}
537-
}
538548

549+
features.Watch.Status = datastore.FeatureUnsupported
550+
features.Watch.Reason = fmt.Sprintf("Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: %s", err.Error())
551+
return nil
552+
}, fmt.Sprintf(cds.beginChangefeedQuery, cds.schema.RelationshipTableName, head, "-1s"))
553+
} else {
539554
features.Watch.Status = datastore.FeatureUnsupported
540-
features.Watch.Reason = fmt.Sprintf("Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: %s", err.Error())
541-
return nil
542-
}, fmt.Sprintf(cds.beginChangefeedQuery, cds.schema.RelationshipTableName, head, "-1s"))
555+
}
543556

544557
return &features, nil
545558
}

internal/datastore/crdb/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type crdbOptions struct {
3232
columnOptimizationOption common.ColumnOptimizationOption
3333
includeQueryParametersInTraces bool
3434
expirationDisabled bool
35+
watchDisabled bool
3536
}
3637

3738
const (
@@ -62,6 +63,7 @@ const (
6263
defaultColumnOptimizationOption = common.ColumnOptimizationOptionNone
6364
defaultIncludeQueryParametersInTraces = false
6465
defaultExpirationDisabled = false
66+
defaultWatchDisabled = false
6567
)
6668

6769
// Option provides the facility to configure how clients within the CRDB
@@ -88,6 +90,7 @@ func generateConfig(options []Option) (crdbOptions, error) {
8890
columnOptimizationOption: defaultColumnOptimizationOption,
8991
includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces,
9092
expirationDisabled: defaultExpirationDisabled,
93+
watchDisabled: defaultWatchDisabled,
9194
}
9295

9396
for _, option := range options {
@@ -376,3 +379,8 @@ func WithColumnOptimization(isEnabled bool) Option {
376379
func WithExpirationDisabled(isDisabled bool) Option {
377380
return func(po *crdbOptions) { po.expirationDisabled = isDisabled }
378381
}
382+
383+
// WithWatchDisabled configures the datastore to disable watch functionality.
384+
func WithWatchDisabled(isDisabled bool) Option {
385+
return func(po *crdbOptions) { po.watchDisabled = isDisabled }
386+
}

internal/datastore/crdb/watch.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Rev
8282
return updates, errs
8383
}
8484

85+
if !cds.watchEnabled {
86+
close(updates)
87+
errs <- datastore.NewWatchDisabledErr("watch API has been explicitly disabled for this datastore")
88+
return updates, errs
89+
}
90+
8591
if features.Watch.Status != datastore.FeatureSupported {
8692
close(updates)
8793
errs <- datastore.NewWatchDisabledErr(fmt.Sprintf("%s. See https://spicedb.dev/d/enable-watch-api-crdb", features.Watch.Reason))

internal/datastore/mysql/datastore.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, option
282282
gcTimeout: config.gcMaxOperationTime,
283283
gcCtx: gcCtx,
284284
cancelGc: cancelGc,
285+
watchEnabled: !config.watchDisabled,
285286
watchBufferLength: config.watchBufferLength,
286287
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
287288
optimizedRevisionQuery: revisionQuery,
@@ -502,6 +503,7 @@ type Datastore struct {
502503
gcTimeout time.Duration
503504
watchBufferLength uint16
504505
watchBufferWriteTimeout time.Duration
506+
watchEnabled bool
505507
maxRetries uint8
506508
filterMaximumIDCount uint16
507509
schema common.SchemaInformation
@@ -574,9 +576,14 @@ func (mds *Datastore) Features(_ context.Context) (*datastore.Features, error) {
574576
}
575577

576578
func (mds *Datastore) OfflineFeatures() (*datastore.Features, error) {
579+
watchSupported := datastore.FeatureUnsupported
580+
if mds.watchEnabled {
581+
watchSupported = datastore.FeatureSupported
582+
}
583+
577584
return &datastore.Features{
578585
Watch: datastore.Feature{
579-
Status: datastore.FeatureSupported,
586+
Status: watchSupported,
580587
},
581588
IntegrityData: datastore.Feature{
582589
Status: datastore.FeatureUnsupported,

internal/datastore/mysql/options.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
defaultFilterMaximumIDCount = 100
2929
defaultColumnOptimizationOption = common.ColumnOptimizationOptionNone
3030
defaultExpirationDisabled = false
31+
defaultWatchDisabled = false
3132
// no follower delay by default, it should only be set if using read replicas
3233
defaultFollowerReadDelay = 0
3334
)
@@ -55,6 +56,7 @@ type mysqlOptions struct {
5556
allowedMigrations []string
5657
columnOptimizationOption common.ColumnOptimizationOption
5758
expirationDisabled bool
59+
watchDisabled bool
5860
}
5961

6062
// Option provides the facility to configure how clients within the
@@ -81,6 +83,7 @@ func generateConfig(options []Option) (mysqlOptions, error) {
8183
columnOptimizationOption: defaultColumnOptimizationOption,
8284
expirationDisabled: defaultExpirationDisabled,
8385
followerReadDelay: defaultFollowerReadDelay,
86+
watchDisabled: defaultWatchDisabled,
8487
}
8588

8689
for _, option := range options {
@@ -306,3 +309,10 @@ func WithExpirationDisabled(isDisabled bool) Option {
306309
mo.expirationDisabled = isDisabled
307310
}
308311
}
312+
313+
// WithWatchDisabled disables the watch functionality in the MySQL datastore.
314+
func WithWatchDisabled(isDisabled bool) Option {
315+
return func(mo *mysqlOptions) {
316+
mo.watchDisabled = isDisabled
317+
}
318+
}

internal/datastore/mysql/watch.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
2929
updates := make(chan datastore.RevisionChanges, watchBufferLength)
3030
errs := make(chan error, 1)
3131

32+
if !mds.watchEnabled {
33+
close(updates)
34+
errs <- datastore.NewWatchDisabledErr("watch is disabled")
35+
return updates, errs
36+
}
37+
3238
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
3339
close(updates)
3440
errs <- errors.New("schema watch unsupported in MySQL")

internal/datastore/postgres/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type postgresOptions struct {
3131
gcEnabled bool
3232
readStrictMode bool
3333
expirationDisabled bool
34+
watchDisabled bool
3435
columnOptimizationOption common.ColumnOptimizationOption
3536
includeQueryParametersInTraces bool
3637
revisionHeartbeatEnabled bool
@@ -76,6 +77,7 @@ const (
7677
defaultColumnOptimizationOption = common.ColumnOptimizationOptionNone
7778
defaultIncludeQueryParametersInTraces = false
7879
defaultExpirationDisabled = false
80+
defaultWatchDisabled = false
7981
// no follower delay by default, it should only be set if using read replicas
8082
defaultFollowerReadDelay = 0
8183
defaultRevisionHeartbeat = true
@@ -104,6 +106,7 @@ func generateConfig(options []Option) (postgresOptions, error) {
104106
columnOptimizationOption: defaultColumnOptimizationOption,
105107
includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces,
106108
expirationDisabled: defaultExpirationDisabled,
109+
watchDisabled: defaultWatchDisabled,
107110
followerReadDelay: defaultFollowerReadDelay,
108111
revisionHeartbeatEnabled: defaultRevisionHeartbeat,
109112
}
@@ -428,3 +431,8 @@ func WithExpirationDisabled(isDisabled bool) Option {
428431
func WithRevisionHeartbeat(isEnabled bool) Option {
429432
return func(po *postgresOptions) { po.revisionHeartbeatEnabled = isEnabled }
430433
}
434+
435+
// WithWatchDisabled disables the watch functionality.
436+
func WithWatchDisabled(isDisabled bool) Option {
437+
return func(po *postgresOptions) { po.watchDisabled = isDisabled }
438+
}

internal/datastore/postgres/postgres.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,13 @@ func newPostgresDatastore(
225225
return nil, err
226226
}
227227

228-
watchEnabled := trackTSOn == "on"
228+
watchEnabled := trackTSOn == "on" && !config.watchDisabled
229229
if !watchEnabled {
230-
log.Warn().Msg("watch API disabled, postgres must be run with track_commit_timestamp=on")
230+
if config.watchDisabled {
231+
log.Warn().Msg("watch API disabled via configuration")
232+
} else {
233+
log.Warn().Msg("watch API disabled, postgres must be run with track_commit_timestamp=on")
234+
}
231235
}
232236

233237
if config.enablePrometheusStats {

internal/datastore/spanner/options.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type spannerOptions struct {
5656
filterMaximumIDCount uint16
5757
columnOptimizationOption common.ColumnOptimizationOption
5858
expirationDisabled bool
59+
watchDisabled bool
5960
datastoreMetricsOption DatastoreMetricsOption
6061
}
6162

@@ -82,6 +83,7 @@ const (
8283
defaultFilterMaximumIDCount = 100
8384
defaultColumnOptimizationOption = common.ColumnOptimizationOptionNone
8485
defaultExpirationDisabled = false
86+
defaultWatchDisabled = false
8587
)
8688

8789
// Option provides the facility to configure how clients within the Spanner
@@ -107,6 +109,7 @@ func generateConfig(options []Option) (spannerOptions, error) {
107109
filterMaximumIDCount: defaultFilterMaximumIDCount,
108110
columnOptimizationOption: defaultColumnOptimizationOption,
109111
expirationDisabled: defaultExpirationDisabled,
112+
watchDisabled: defaultWatchDisabled,
110113
}
111114

112115
for _, option := range options {
@@ -285,3 +288,10 @@ func WithExpirationDisabled(isDisabled bool) Option {
285288
po.expirationDisabled = isDisabled
286289
}
287290
}
291+
292+
// WithWatchDisabled disables watch support in the Spanner.
293+
func WithWatchDisabled(isDisabled bool) Option {
294+
return func(po *spannerOptions) {
295+
po.watchDisabled = isDisabled
296+
}
297+
}

internal/datastore/spanner/spanner.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type spannerDatastore struct {
8686

8787
watchBufferLength uint16
8888
watchBufferWriteTimeout time.Duration
89+
watchEnabled bool
8990

9091
client *spanner.Client
9192
config spannerOptions
@@ -224,6 +225,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
224225
database: database,
225226
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
226227
watchBufferLength: config.watchBufferLength,
228+
watchEnabled: !config.watchDisabled,
227229
cachedEstimatedBytesPerRelationship: 0,
228230
cachedEstimatedBytesPerRelationshipLock: sync.RWMutex{},
229231
tableSizesStatsTable: tableSizesStatsTable,
@@ -373,9 +375,14 @@ func (sd *spannerDatastore) Features(ctx context.Context) (*datastore.Features,
373375
}
374376

375377
func (sd *spannerDatastore) OfflineFeatures() (*datastore.Features, error) {
378+
watchSupported := datastore.FeatureUnsupported
379+
if sd.watchEnabled {
380+
watchSupported = datastore.FeatureSupported
381+
}
382+
376383
return &datastore.Features{
377384
Watch: datastore.Feature{
378-
Status: datastore.FeatureSupported,
385+
Status: watchSupported,
379386
},
380387
IntegrityData: datastore.Feature{
381388
Status: datastore.FeatureUnsupported,

internal/datastore/spanner/watch.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ func (sd *spannerDatastore) watch(
107107
errs <- err
108108
}
109109

110+
if !sd.watchEnabled {
111+
sendError(datastore.NewWatchDisabledErr("watch disabled in this datastore"))
112+
return
113+
}
114+
110115
watchBufferWriteTimeout := opts.WatchBufferWriteTimeout
111116
if watchBufferWriteTimeout <= 0 {
112117
watchBufferWriteTimeout = sd.watchBufferWriteTimeout

0 commit comments

Comments
 (0)