diff --git a/CHANGELOG.md b/CHANGELOG.md index b90d2cf732a..18c9aaf4456 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,7 @@ * [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525 * [ENHANCEMENT] Ruler: gRPC errors without details are classified as `operator` errors, and rule evaluation failures (such as duplicate labelsets) are classified as `user` errors. #13586 * [ENHANCEMENT] Server: The `/metrics` endpoint now supports metrics filtering by providing one or more `name[]` query parameters. #13746 +* [ENHANCEMENT] Bucket storage: Add support for GCS rate limiting with exponential ramping following Google Cloud Storage best practices. Enable upload rate limiting with `-gcs.upload-rate-limit-enabled` and configure with `-gcs.upload-initial-qps`, `-gcs.upload-max-qps`, and `-gcs.upload-ramp-period`. Enable read rate limiting with `-gcs.read-rate-limit-enabled` and configure with `-gcs.read-initial-qps`, `-gcs.read-max-qps`, and `-gcs.read-ramp-period`. #13703 * [BUGFIX] Compactor: Fix potential concurrent map writes. #13053 * [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084 * [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 02ef88fddf3..412a993c523 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -9095,6 +9095,94 @@ "fieldType": "int", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "upload_rate_limit_enabled", + "required": false, + "desc": "Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "blocks-storage.gcs.upload-rate-limit-enabled", + "fieldType": "boolean", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_initial_qps", + "required": false, + "desc": "Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum.", + "fieldValue": null, + "fieldDefaultValue": 1000, + "fieldFlag": "blocks-storage.gcs.upload-initial-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_max_qps", + "required": false, + "desc": "Maximum queries per second limit for GCS uploads.", + "fieldValue": null, + "fieldDefaultValue": 3200, + "fieldFlag": "blocks-storage.gcs.upload-max-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_ramp_period", + "required": false, + "desc": "Time period over which the upload rate doubles, following the Google recommendation.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "blocks-storage.gcs.upload-ramp-period", + "fieldType": "duration", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_rate_limit_enabled", + "required": false, + "desc": "Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "blocks-storage.gcs.read-rate-limit-enabled", + "fieldType": "boolean", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_initial_qps", + "required": false, + "desc": "Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum.", + "fieldValue": null, + "fieldDefaultValue": 5000, + "fieldFlag": "blocks-storage.gcs.read-initial-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_max_qps", + "required": false, + "desc": "Maximum queries per second limit for GCS reads.", + "fieldValue": null, + "fieldDefaultValue": 16000, + "fieldFlag": "blocks-storage.gcs.read-max-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_ramp_period", + "required": false, + "desc": "Time period over which the read rate doubles, following the Google recommendation.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "blocks-storage.gcs.read-ramp-period", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "block", "name": "http", @@ -14949,6 +15037,94 @@ "fieldType": "int", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "upload_rate_limit_enabled", + "required": false, + "desc": "Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "ruler-storage.gcs.upload-rate-limit-enabled", + "fieldType": "boolean", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_initial_qps", + "required": false, + "desc": "Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum.", + "fieldValue": null, + "fieldDefaultValue": 1000, + "fieldFlag": "ruler-storage.gcs.upload-initial-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_max_qps", + "required": false, + "desc": "Maximum queries per second limit for GCS uploads.", + "fieldValue": null, + "fieldDefaultValue": 3200, + "fieldFlag": "ruler-storage.gcs.upload-max-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_ramp_period", + "required": false, + "desc": "Time period over which the upload rate doubles, following the Google recommendation.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "ruler-storage.gcs.upload-ramp-period", + "fieldType": "duration", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_rate_limit_enabled", + "required": false, + "desc": "Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "ruler-storage.gcs.read-rate-limit-enabled", + "fieldType": "boolean", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_initial_qps", + "required": false, + "desc": "Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum.", + "fieldValue": null, + "fieldDefaultValue": 5000, + "fieldFlag": "ruler-storage.gcs.read-initial-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_max_qps", + "required": false, + "desc": "Maximum queries per second limit for GCS reads.", + "fieldValue": null, + "fieldDefaultValue": 16000, + "fieldFlag": "ruler-storage.gcs.read-max-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_ramp_period", + "required": false, + "desc": "Time period over which the read rate doubles, following the Google recommendation.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "ruler-storage.gcs.read-ramp-period", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "block", "name": "http", @@ -17240,6 +17416,94 @@ "fieldType": "int", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "upload_rate_limit_enabled", + "required": false, + "desc": "Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "alertmanager-storage.gcs.upload-rate-limit-enabled", + "fieldType": "boolean", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_initial_qps", + "required": false, + "desc": "Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum.", + "fieldValue": null, + "fieldDefaultValue": 1000, + "fieldFlag": "alertmanager-storage.gcs.upload-initial-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_max_qps", + "required": false, + "desc": "Maximum queries per second limit for GCS uploads.", + "fieldValue": null, + "fieldDefaultValue": 3200, + "fieldFlag": "alertmanager-storage.gcs.upload-max-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_ramp_period", + "required": false, + "desc": "Time period over which the upload rate doubles, following the Google recommendation.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "alertmanager-storage.gcs.upload-ramp-period", + "fieldType": "duration", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_rate_limit_enabled", + "required": false, + "desc": "Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "alertmanager-storage.gcs.read-rate-limit-enabled", + "fieldType": "boolean", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_initial_qps", + "required": false, + "desc": "Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum.", + "fieldValue": null, + "fieldDefaultValue": 5000, + "fieldFlag": "alertmanager-storage.gcs.read-initial-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_max_qps", + "required": false, + "desc": "Maximum queries per second limit for GCS reads.", + "fieldValue": null, + "fieldDefaultValue": 16000, + "fieldFlag": "alertmanager-storage.gcs.read-max-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_ramp_period", + "required": false, + "desc": "Time period over which the read rate doubles, following the Google recommendation.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "alertmanager-storage.gcs.read-ramp-period", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "block", "name": "http", @@ -20132,6 +20396,94 @@ "fieldType": "int", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "upload_rate_limit_enabled", + "required": false, + "desc": "Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "common.storage.gcs.upload-rate-limit-enabled", + "fieldType": "boolean", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_initial_qps", + "required": false, + "desc": "Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum.", + "fieldValue": null, + "fieldDefaultValue": 1000, + "fieldFlag": "common.storage.gcs.upload-initial-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_max_qps", + "required": false, + "desc": "Maximum queries per second limit for GCS uploads.", + "fieldValue": null, + "fieldDefaultValue": 3200, + "fieldFlag": "common.storage.gcs.upload-max-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "upload_ramp_period", + "required": false, + "desc": "Time period over which the upload rate doubles, following the Google recommendation.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "common.storage.gcs.upload-ramp-period", + "fieldType": "duration", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_rate_limit_enabled", + "required": false, + "desc": "Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "common.storage.gcs.read-rate-limit-enabled", + "fieldType": "boolean", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_initial_qps", + "required": false, + "desc": "Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum.", + "fieldValue": null, + "fieldDefaultValue": 5000, + "fieldFlag": "common.storage.gcs.read-initial-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_max_qps", + "required": false, + "desc": "Maximum queries per second limit for GCS reads.", + "fieldValue": null, + "fieldDefaultValue": 16000, + "fieldFlag": "common.storage.gcs.read-max-qps", + "fieldType": "int", + "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "read_ramp_period", + "required": false, + "desc": "Time period over which the read rate doubles, following the Google recommendation.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "common.storage.gcs.read-ramp-period", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "block", "name": "http", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 0ec43dfcb4a..3ce4e84da0f 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -73,10 +73,26 @@ Usage of ./cmd/mimir/mimir: Maximum number of idle (keep-alive) connections to keep per-host. Set to 0 to use a built-in default value of 2. (default 100) -alertmanager-storage.gcs.max-retries int Maximum number of attempts for GCS operations (0 = unlimited, 1 = no retries). Applies to both regular and upload retry modes. (default 20) + -alertmanager-storage.gcs.read-initial-qps int + Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum. (default 5000) + -alertmanager-storage.gcs.read-max-qps int + Maximum queries per second limit for GCS reads. (default 16000) + -alertmanager-storage.gcs.read-ramp-period duration + Time period over which the read rate doubles, following the Google recommendation. (default 20m0s) + -alertmanager-storage.gcs.read-rate-limit-enabled + Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices. -alertmanager-storage.gcs.service-account string JSON either from a Google Developers Console client_credentials.json file, or a Google Developers service account key. Needs to be valid JSON, not a filesystem path. -alertmanager-storage.gcs.tls-handshake-timeout duration Maximum time to wait for a TLS handshake. Set to 0 for no limit. (default 10s) + -alertmanager-storage.gcs.upload-initial-qps int + Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum. (default 1000) + -alertmanager-storage.gcs.upload-max-qps int + Maximum queries per second limit for GCS uploads. (default 3200) + -alertmanager-storage.gcs.upload-ramp-period duration + Time period over which the upload rate doubles, following the Google recommendation. (default 20m0s) + -alertmanager-storage.gcs.upload-rate-limit-enabled + Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices. -alertmanager-storage.local.path string Path at which alertmanager configurations are stored. -alertmanager-storage.s3.access-key-id string @@ -657,10 +673,26 @@ Usage of ./cmd/mimir/mimir: Maximum number of idle (keep-alive) connections to keep per-host. Set to 0 to use a built-in default value of 2. (default 100) -blocks-storage.gcs.max-retries int Maximum number of attempts for GCS operations (0 = unlimited, 1 = no retries). Applies to both regular and upload retry modes. (default 20) + -blocks-storage.gcs.read-initial-qps int + Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum. (default 5000) + -blocks-storage.gcs.read-max-qps int + Maximum queries per second limit for GCS reads. (default 16000) + -blocks-storage.gcs.read-ramp-period duration + Time period over which the read rate doubles, following the Google recommendation. (default 20m0s) + -blocks-storage.gcs.read-rate-limit-enabled + Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices. -blocks-storage.gcs.service-account string JSON either from a Google Developers Console client_credentials.json file, or a Google Developers service account key. Needs to be valid JSON, not a filesystem path. -blocks-storage.gcs.tls-handshake-timeout duration Maximum time to wait for a TLS handshake. Set to 0 for no limit. (default 10s) + -blocks-storage.gcs.upload-initial-qps int + Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum. (default 1000) + -blocks-storage.gcs.upload-max-qps int + Maximum queries per second limit for GCS uploads. (default 3200) + -blocks-storage.gcs.upload-ramp-period duration + Time period over which the upload rate doubles, following the Google recommendation. (default 20m0s) + -blocks-storage.gcs.upload-rate-limit-enabled + Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices. -blocks-storage.s3.access-key-id string S3 access key ID -blocks-storage.s3.bucket-lookup-type value @@ -927,10 +959,26 @@ Usage of ./cmd/mimir/mimir: Maximum number of idle (keep-alive) connections to keep per-host. Set to 0 to use a built-in default value of 2. (default 100) -common.storage.gcs.max-retries int Maximum number of attempts for GCS operations (0 = unlimited, 1 = no retries). Applies to both regular and upload retry modes. (default 20) + -common.storage.gcs.read-initial-qps int + Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum. (default 5000) + -common.storage.gcs.read-max-qps int + Maximum queries per second limit for GCS reads. (default 16000) + -common.storage.gcs.read-ramp-period duration + Time period over which the read rate doubles, following the Google recommendation. (default 20m0s) + -common.storage.gcs.read-rate-limit-enabled + Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices. -common.storage.gcs.service-account string JSON either from a Google Developers Console client_credentials.json file, or a Google Developers service account key. Needs to be valid JSON, not a filesystem path. -common.storage.gcs.tls-handshake-timeout duration Maximum time to wait for a TLS handshake. Set to 0 for no limit. (default 10s) + -common.storage.gcs.upload-initial-qps int + Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum. (default 1000) + -common.storage.gcs.upload-max-qps int + Maximum queries per second limit for GCS uploads. (default 3200) + -common.storage.gcs.upload-ramp-period duration + Time period over which the upload rate doubles, following the Google recommendation. (default 20m0s) + -common.storage.gcs.upload-rate-limit-enabled + Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices. -common.storage.s3.access-key-id string S3 access key ID -common.storage.s3.bucket-lookup-type value @@ -2753,10 +2801,26 @@ Usage of ./cmd/mimir/mimir: Maximum number of idle (keep-alive) connections to keep per-host. Set to 0 to use a built-in default value of 2. (default 100) -ruler-storage.gcs.max-retries int Maximum number of attempts for GCS operations (0 = unlimited, 1 = no retries). Applies to both regular and upload retry modes. (default 20) + -ruler-storage.gcs.read-initial-qps int + Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum. (default 5000) + -ruler-storage.gcs.read-max-qps int + Maximum queries per second limit for GCS reads. (default 16000) + -ruler-storage.gcs.read-ramp-period duration + Time period over which the read rate doubles, following the Google recommendation. (default 20m0s) + -ruler-storage.gcs.read-rate-limit-enabled + Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices. -ruler-storage.gcs.service-account string JSON either from a Google Developers Console client_credentials.json file, or a Google Developers service account key. Needs to be valid JSON, not a filesystem path. -ruler-storage.gcs.tls-handshake-timeout duration Maximum time to wait for a TLS handshake. Set to 0 for no limit. (default 10s) + -ruler-storage.gcs.upload-initial-qps int + Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum. (default 1000) + -ruler-storage.gcs.upload-max-qps int + Maximum queries per second limit for GCS uploads. (default 3200) + -ruler-storage.gcs.upload-ramp-period duration + Time period over which the upload rate doubles, following the Google recommendation. (default 20m0s) + -ruler-storage.gcs.upload-rate-limit-enabled + Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices. -ruler-storage.local.directory string Directory to scan for rules -ruler-storage.s3.access-key-id string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 8bcc2688e99..401400fabcc 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -6014,6 +6014,44 @@ The gcs_backend block configures the connection to Google Cloud Storage object s # CLI flag: -.gcs.max-retries [max_retries: | default = 20] +# (advanced) Enable rate limiting for GCS uploads. When enabled, uploads +# gradually ramp up following Google Cloud Storage best practices. +# CLI flag: -.gcs.upload-rate-limit-enabled +[upload_rate_limit_enabled: | default = false] + +# (advanced) Initial queries per second limit for GCS uploads. The rate doubles +# every ramp period until it reaches the maximum. +# CLI flag: -.gcs.upload-initial-qps +[upload_initial_qps: | default = 1000] + +# (advanced) Maximum queries per second limit for GCS uploads. +# CLI flag: -.gcs.upload-max-qps +[upload_max_qps: | default = 3200] + +# (advanced) Time period over which the upload rate doubles, following the +# Google recommendation. +# CLI flag: -.gcs.upload-ramp-period +[upload_ramp_period: | default = 20m] + +# (advanced) Enable rate limiting for GCS reads. When enabled, reads gradually +# ramp up following Google Cloud Storage best practices. +# CLI flag: -.gcs.read-rate-limit-enabled +[read_rate_limit_enabled: | default = false] + +# (advanced) Initial queries per second limit for GCS reads. The rate doubles +# every ramp period until it reaches the maximum. +# CLI flag: -.gcs.read-initial-qps +[read_initial_qps: | default = 5000] + +# (advanced) Maximum queries per second limit for GCS reads. +# CLI flag: -.gcs.read-max-qps +[read_max_qps: | default = 16000] + +# (advanced) Time period over which the read rate doubles, following the Google +# recommendation. +# CLI flag: -.gcs.read-ramp-period +[read_ramp_period: | default = 20m] + http: # (advanced) The time an idle connection remains idle before closing. # CLI flag: -.gcs.http.idle-conn-timeout diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 3b020a6732a..709054b8d01 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -625,6 +625,12 @@ "blocks-storage.gcs.bucket-name": "", "blocks-storage.gcs.service-account": "", "blocks-storage.gcs.max-retries": 20, + "blocks-storage.gcs.upload-initial-qps": 1000, + "blocks-storage.gcs.upload-max-qps": 3200, + "blocks-storage.gcs.upload-ramp-period": 1200000000000, + "blocks-storage.gcs.read-initial-qps": 5000, + "blocks-storage.gcs.read-max-qps": 16000, + "blocks-storage.gcs.read-ramp-period": 1200000000000, "blocks-storage.gcs.http.idle-conn-timeout": 90000000000, "blocks-storage.gcs.http.response-header-timeout": 120000000000, "blocks-storage.gcs.tls-handshake-timeout": 10000000000, @@ -1037,6 +1043,12 @@ "ruler-storage.gcs.bucket-name": "", "ruler-storage.gcs.service-account": "", "ruler-storage.gcs.max-retries": 20, + "ruler-storage.gcs.upload-initial-qps": 1000, + "ruler-storage.gcs.upload-max-qps": 3200, + "ruler-storage.gcs.upload-ramp-period": 1200000000000, + "ruler-storage.gcs.read-initial-qps": 5000, + "ruler-storage.gcs.read-max-qps": 16000, + "ruler-storage.gcs.read-ramp-period": 1200000000000, "ruler-storage.gcs.http.idle-conn-timeout": 90000000000, "ruler-storage.gcs.http.response-header-timeout": 120000000000, "ruler-storage.gcs.tls-handshake-timeout": 10000000000, @@ -1213,6 +1225,12 @@ "alertmanager-storage.gcs.bucket-name": "", "alertmanager-storage.gcs.service-account": "", "alertmanager-storage.gcs.max-retries": 20, + "alertmanager-storage.gcs.upload-initial-qps": 1000, + "alertmanager-storage.gcs.upload-max-qps": 3200, + "alertmanager-storage.gcs.upload-ramp-period": 1200000000000, + "alertmanager-storage.gcs.read-initial-qps": 5000, + "alertmanager-storage.gcs.read-max-qps": 16000, + "alertmanager-storage.gcs.read-ramp-period": 1200000000000, "alertmanager-storage.gcs.http.idle-conn-timeout": 90000000000, "alertmanager-storage.gcs.http.response-header-timeout": 120000000000, "alertmanager-storage.gcs.tls-handshake-timeout": 10000000000, @@ -1423,6 +1441,12 @@ "common.storage.gcs.bucket-name": "", "common.storage.gcs.service-account": "", "common.storage.gcs.max-retries": 20, + "common.storage.gcs.upload-initial-qps": 1000, + "common.storage.gcs.upload-max-qps": 3200, + "common.storage.gcs.upload-ramp-period": 1200000000000, + "common.storage.gcs.read-initial-qps": 5000, + "common.storage.gcs.read-max-qps": 16000, + "common.storage.gcs.read-ramp-period": 1200000000000, "common.storage.gcs.http.idle-conn-timeout": 90000000000, "common.storage.gcs.http.response-header-timeout": 120000000000, "common.storage.gcs.tls-handshake-timeout": 10000000000, diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 129019a7e0d..f159d9b7697 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -120,6 +120,12 @@ func (cfg *StorageBackendConfig) Validate() error { } } + if cfg.Backend == GCS { + if err := cfg.GCS.Validate(); err != nil { + return err + } + } + return nil } @@ -170,7 +176,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, case S3: backendClient, err = s3.NewBucketClient(cfg.S3, name, logger) case GCS: - backendClient, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) + backendClient, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger, reg) case Azure: backendClient, err = azure.NewBucketClient(cfg.Azure, name, logger) case Swift: diff --git a/pkg/storage/bucket/gcs/bucket_client.go b/pkg/storage/bucket/gcs/bucket_client.go index c33553f384a..e59ba8a6e2e 100644 --- a/pkg/storage/bucket/gcs/bucket_client.go +++ b/pkg/storage/bucket/gcs/bucket_client.go @@ -7,19 +7,27 @@ package gcs import ( "context" + "errors" + "fmt" "io" + "net/http" "cloud.google.com/go/storage" "github.com/go-kit/log" - "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/gcs" + "google.golang.org/api/googleapi" ) // NewBucketClient creates a new GCS bucket client. // If cfg.EnableUploadRetries is true, all Upload operations will automatically be retried // on transient errors using the GCS RetryAlways policy. -func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { +// If cfg.UploadRateLimitEnabled is true, uploads will be rate +// limited following Google Cloud Storage best practices for upload request rate ramping. +// If cfg.ReadRateLimitEnabled is true, reads will be rate +// limited following Google Cloud Storage best practices for read request rate ramping. +func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { bucketConfig := gcs.Config{ Bucket: cfg.BucketName, ServiceAccount: cfg.ServiceAccount.String(), @@ -28,22 +36,36 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo } gcsBucket, err := gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil) if err != nil { - return nil, errors.Wrap(err, "NewBucketClient: create bucket") + return nil, fmt.Errorf("NewBucketClient: create bucket: %w", err) } - if !cfg.EnableUploadRetries { - return gcsBucket, nil + var bucket objstore.Bucket = gcsBucket + + // Apply retry wrapper if enabled. + if cfg.EnableUploadRetries { + retryOpts := []storage.RetryOption{storage.WithPolicy(storage.RetryAlways)} + if cfg.MaxRetries > 0 { + retryOpts = append(retryOpts, storage.WithMaxAttempts(cfg.MaxRetries)) + } + bucket = &retryAlwaysBucket{ + Bucket: gcsBucket, + bkt: gcsBucket.Handle().Retryer(retryOpts...), + chunkSize: bucketConfig.ChunkSizeBytes, + } } - retryOpts := []storage.RetryOption{storage.WithPolicy(storage.RetryAlways)} - if cfg.MaxRetries > 0 { - retryOpts = append(retryOpts, storage.WithMaxAttempts(cfg.MaxRetries)) + // Apply rate limiting wrapper if enabled. + if cfg.UploadRateLimitEnabled || cfg.ReadRateLimitEnabled { + rlb := &rateLimitedBucket{Bucket: bucket} + if cfg.UploadRateLimitEnabled { + rlb.uploadLimiter = newRateLimiter(name, cfg.UploadInitialQPS, cfg.UploadMaxQPS, cfg.UploadRampPeriod, uploadRateLimiter, reg) + } + if cfg.ReadRateLimitEnabled { + rlb.readLimiter = newRateLimiter(name, cfg.ReadInitialQPS, cfg.ReadMaxQPS, cfg.ReadRampPeriod, readRateLimiter, reg) + } + bucket = rlb } - return &retryAlwaysBucket{ - Bucket: gcsBucket, - bkt: gcsBucket.Handle().Retryer(retryOpts...), - chunkSize: bucketConfig.ChunkSizeBytes, - }, nil + return bucket, nil } // retryAlwaysBucket wraps a GCS bucket to automatically retry Upload operations @@ -69,12 +91,164 @@ func (b *retryAlwaysBucket) Upload(ctx context.Context, name string, r io.Reader if _, err := io.Copy(w, r); err != nil { _ = w.Close() - return errors.Wrap(err, "write object") + return fmt.Errorf("write object: %w", err) } if err := w.Close(); err != nil { - return errors.Wrap(err, "close writer") + return fmt.Errorf("close writer: %w", err) } return nil } + +// rateLimitedBucket wraps a GCS bucket to rate limit uploads and/or reads. +// This implements exponential doubling of request rates following Google Cloud Storage +// best practices for ramping up request rates. +// +// The bucket also implements adaptive rate limiting: when GCS returns a 429 +// rate limit error, the rate limiter backs off by halving the current QPS. +type rateLimitedBucket struct { + objstore.Bucket + uploadLimiter *rateLimiter + readLimiter *rateLimiter +} + +// isRateLimitError checks if the error is a GCS rate limit error (HTTP 429). +func isRateLimitError(err error) bool { + if err == nil { + return false + } + var googleErr *googleapi.Error + if errors.As(err, &googleErr) { + return googleErr.Code == http.StatusTooManyRequests + } + return false +} + +// Upload performs a rate-limited upload. It waits for rate limiter approval before +// delegating to the underlying bucket's Upload method. If GCS returns a 429 rate +// limit error, the rate limiter backs off. +func (b *rateLimitedBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error { + if b.uploadLimiter != nil { + if err := b.uploadLimiter.Wait(ctx); err != nil { + return fmt.Errorf("bucket upload rate limiter wait: %w", err) + } + } + err := b.Bucket.Upload(ctx, name, r, opts...) + if isRateLimitError(err) && b.uploadLimiter != nil { + b.uploadLimiter.Backoff() + } + return err +} + +// Get performs a rate-limited get. It waits for rate limiter approval before +// delegating to the underlying bucket's Get method. If GCS returns a 429 rate +// limit error, the rate limiter backs off. +func (b *rateLimitedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + if b.readLimiter != nil { + if err := b.readLimiter.Wait(ctx); err != nil { + return nil, fmt.Errorf("bucket read rate limiter wait: %w", err) + } + } + rc, err := b.Bucket.Get(ctx, name) + if isRateLimitError(err) && b.readLimiter != nil { + b.readLimiter.Backoff() + } + return rc, err +} + +// GetRange performs a rate-limited range get. It waits for rate limiter approval before +// delegating to the underlying bucket's GetRange method. If GCS returns a 429 rate +// limit error, the rate limiter backs off. +func (b *rateLimitedBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if b.readLimiter != nil { + if err := b.readLimiter.Wait(ctx); err != nil { + return nil, fmt.Errorf("bucket read rate limiter wait: %w", err) + } + } + rc, err := b.Bucket.GetRange(ctx, name, off, length) + if isRateLimitError(err) && b.readLimiter != nil { + b.readLimiter.Backoff() + } + return rc, err +} + +// Exists performs a rate-limited existence check. It waits for rate limiter approval before +// delegating to the underlying bucket's Exists method. If GCS returns a 429 rate +// limit error, the rate limiter backs off. +func (b *rateLimitedBucket) Exists(ctx context.Context, name string) (bool, error) { + if b.readLimiter != nil { + if err := b.readLimiter.Wait(ctx); err != nil { + return false, fmt.Errorf("bucket read rate limiter wait: %w", err) + } + } + exists, err := b.Bucket.Exists(ctx, name) + if isRateLimitError(err) && b.readLimiter != nil { + b.readLimiter.Backoff() + } + return exists, err +} + +// Attributes performs a rate-limited attributes lookup. It waits for rate limiter approval before +// delegating to the underlying bucket's Attributes method. If GCS returns a 429 rate +// limit error, the rate limiter backs off. +func (b *rateLimitedBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + if b.readLimiter != nil { + if err := b.readLimiter.Wait(ctx); err != nil { + return objstore.ObjectAttributes{}, fmt.Errorf("bucket read rate limiter wait: %w", err) + } + } + attrs, err := b.Bucket.Attributes(ctx, name) + if isRateLimitError(err) && b.readLimiter != nil { + b.readLimiter.Backoff() + } + return attrs, err +} + +// Iter performs a rate-limited iteration. It waits for rate limiter approval before +// delegating to the underlying bucket's Iter method. If GCS returns a 429 rate +// limit error, the rate limiter backs off. +func (b *rateLimitedBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + if b.readLimiter != nil { + if err := b.readLimiter.Wait(ctx); err != nil { + return fmt.Errorf("bucket read rate limiter wait: %w", err) + } + } + err := b.Bucket.Iter(ctx, dir, f, options...) + if isRateLimitError(err) && b.readLimiter != nil { + b.readLimiter.Backoff() + } + return err +} + +// IterWithAttributes performs a rate-limited iteration with attributes. It waits for rate limiter approval before +// delegating to the underlying bucket's IterWithAttributes method. If GCS returns a 429 rate +// limit error, the rate limiter backs off. +func (b *rateLimitedBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if b.readLimiter != nil { + if err := b.readLimiter.Wait(ctx); err != nil { + return fmt.Errorf("bucket read rate limiter wait: %w", err) + } + } + err := b.Bucket.IterWithAttributes(ctx, dir, f, options...) + if isRateLimitError(err) && b.readLimiter != nil { + b.readLimiter.Backoff() + } + return err +} + +// Delete performs a rate-limited delete. It waits for rate limiter approval before +// delegating to the underlying bucket's Delete method. If GCS returns a 429 rate +// limit error, the rate limiter backs off. +func (b *rateLimitedBucket) Delete(ctx context.Context, name string) error { + if b.uploadLimiter != nil { + if err := b.uploadLimiter.Wait(ctx); err != nil { + return fmt.Errorf("bucket upload rate limiter wait: %w", err) + } + } + err := b.Bucket.Delete(ctx, name) + if isRateLimitError(err) && b.uploadLimiter != nil { + b.uploadLimiter.Backoff() + } + return err +} diff --git a/pkg/storage/bucket/gcs/bucket_client_test.go b/pkg/storage/bucket/gcs/bucket_client_test.go new file mode 100644 index 00000000000..5e210090523 --- /dev/null +++ b/pkg/storage/bucket/gcs/bucket_client_test.go @@ -0,0 +1,831 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package gcs + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "google.golang.org/api/googleapi" +) + +// mockBucket is a simple mock implementation of objstore.Bucket for testing. +type mockBucket struct { + mu sync.Mutex + uploadCount int + getCount int + getRangeCount int + existsCount int + attributesCount int + iterCount int + iterWithAttributesCount int + deleteCount int + uploadFunc func(ctx context.Context, name string, r io.Reader) error + getFunc func(ctx context.Context, name string) (io.ReadCloser, error) + getRangeFunc func(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) +} + +func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader, _ ...objstore.ObjectUploadOption) error { + m.mu.Lock() + m.uploadCount++ + m.mu.Unlock() + if m.uploadFunc != nil { + return m.uploadFunc(ctx, name, r) + } + // Read the data to simulate actual upload. + _, err := io.ReadAll(r) + return err +} + +func (m *mockBucket) Close() error { return nil } +func (m *mockBucket) Name() string { return "mock" } +func (m *mockBucket) Provider() objstore.ObjProvider { return "MOCK" } +func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + m.iterCount++ + return nil +} +func (m *mockBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + m.iterWithAttributesCount++ + return nil +} +func (m *mockBucket) SupportedIterOptions() []objstore.IterOptionType { return nil } +func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + m.getCount++ + if m.getFunc != nil { + return m.getFunc(ctx, name) + } + return io.NopCloser(strings.NewReader("test data")), nil +} +func (m *mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + m.getRangeCount++ + if m.getRangeFunc != nil { + return m.getRangeFunc(ctx, name, off, length) + } + return io.NopCloser(strings.NewReader("test data")), nil +} +func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) { + m.existsCount++ + return true, nil +} +func (m *mockBucket) IsObjNotFoundErr(err error) bool { return false } +func (m *mockBucket) IsAccessDeniedErr(err error) bool { return false } +func (m *mockBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + m.attributesCount++ + return objstore.ObjectAttributes{Size: 100}, nil +} +func (m *mockBucket) Delete(ctx context.Context, name string) error { + m.deleteCount++ + return nil +} + +func TestRateLimitedBucket(t *testing.T) { + t.Run("Upload", func(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + uploadLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, uploadRateLimiter, reg), + } + + ctx := context.Background() + data := bytes.NewBufferString("test data") + + // First upload should succeed + err := bucket.Upload(ctx, "test-object", data, nil) + require.NoError(t, err) + assert.Equal(t, 1, mock.uploadCount) + }) + + t.Run("waits when rate limit exceeded", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + uploadLimiter: newRateLimiter("test", 5, 5, 20*time.Minute, uploadRateLimiter, reg), + } + + ctx := context.Background() + + // Consume all initial burst tokens (burst = 2 * maxQPS = 10). + for range 10 { + data := bytes.NewBufferString("test data") + err := bucket.Upload(ctx, "test-object", data, nil) + require.NoError(t, err) + } + + // Next upload should be rate limited. + start := time.Now() + data := bytes.NewBufferString("test data") + err := bucket.Upload(ctx, "test-object", data, nil) + require.NoError(t, err) + elapsed := time.Since(start) + + // Should have waited for rate limiter (at 5 QPS, ~200ms per token). + assert.Greater(t, elapsed, 100*time.Millisecond) + }) + + t.Run("returns error when context cancelled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + uploadLimiter: newRateLimiter("test", 1, 1, 20*time.Minute, uploadRateLimiter, reg), + } + + // Consume all burst tokens (burst = 2 * maxQPS = 2). + for range 2 { + err := bucket.Upload(context.Background(), "test", bytes.NewBufferString("data"), nil) + require.NoError(t, err) + } + + // Create cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // Upload should fail with context error + data := bytes.NewBufferString("test data") + err := bucket.Upload(ctx, "test-object", data, nil) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context canceled") + }) + + t.Run("is safe for concurrent use", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + uploadLimiter: newRateLimiter("test", 20, 20, 20*time.Minute, uploadRateLimiter, reg), + } + + ctx := context.Background() + numUploads := 50 + done := make(chan error, numUploads) + + start := time.Now() + + // Launch concurrent uploads + for i := 0; i < numUploads; i++ { + go func(i int) { + data := bytes.NewBufferString("test data") + err := bucket.Upload(ctx, "test-object", data, nil) + done <- err + }(i) + } + + // Wait for all uploads + for i := 0; i < numUploads; i++ { + err := <-done + require.NoError(t, err) + } + + elapsed := time.Since(start) + + // All uploads should complete + assert.Equal(t, numUploads, mock.uploadCount) + + // Should take some time due to rate limiting (50 uploads at 20 QPS with burst = ~2.5 seconds) + assert.Less(t, elapsed, 5*time.Second) + }) + }) + + t.Run("Get", func(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, readRateLimiter, reg), + } + + ctx := context.Background() + + // First get should succeed + reader, err := bucket.Get(ctx, "test-object") + require.NoError(t, err) + require.NotNil(t, reader) + _ = reader.Close() + assert.Equal(t, 1, mock.getCount) + }) + + t.Run("waits when rate limit exceeded", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 5, 5, 20*time.Minute, readRateLimiter, reg), + } + + ctx := context.Background() + + // Consume all initial burst tokens (burst = 2 * maxQPS = 10). + for range 10 { + reader, err := bucket.Get(ctx, "test-object") + require.NoError(t, err) + require.NoError(t, reader.Close()) + } + + // Next get should be rate limited. + start := time.Now() + reader, err := bucket.Get(ctx, "test-object") + require.NoError(t, err) + _ = reader.Close() + elapsed := time.Since(start) + + // Should have waited for rate limiter (at 5 QPS, ~200ms per token). + assert.Greater(t, elapsed, 100*time.Millisecond) + }) + + t.Run("returns error when context cancelled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 1, 1, 20*time.Minute, readRateLimiter, reg), + } + + // Consume all burst tokens (burst = 2 * maxQPS = 2). + for range 2 { + reader, err := bucket.Get(context.Background(), "test") + require.NoError(t, err) + if reader != nil { + require.NoError(t, reader.Close()) + } + } + + // Create cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // Get should fail with context error + _, err := bucket.Get(ctx, "test-object") + assert.Error(t, err) + assert.Contains(t, err.Error(), "context canceled") + }) + }) + + t.Run("GetRange", func(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, readRateLimiter, reg), + } + + ctx := context.Background() + + // First get range should succeed + reader, err := bucket.GetRange(ctx, "test-object", 0, 100) + require.NoError(t, err) + require.NotNil(t, reader) + _ = reader.Close() + assert.Equal(t, 1, mock.getRangeCount) + }) + + t.Run("waits when rate limit exceeded", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 5, 5, 20*time.Minute, readRateLimiter, reg), + } + + ctx := context.Background() + + // Consume all initial burst tokens (burst = 2 * maxQPS = 10). + for range 10 { + reader, err := bucket.GetRange(ctx, "test-object", 0, 100) + require.NoError(t, err) + require.NoError(t, reader.Close()) + } + + // Next get range should be rate limited. + start := time.Now() + reader, err := bucket.GetRange(ctx, "test-object", 0, 100) + require.NoError(t, err) + require.NoError(t, reader.Close()) + elapsed := time.Since(start) + + // Should have waited for rate limiter (at 5 QPS, ~200ms per token). + assert.Greater(t, elapsed, 100*time.Millisecond) + }) + }) + + t.Run("Exists", func(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, readRateLimiter, reg), + } + + ctx := context.Background() + + exists, err := bucket.Exists(ctx, "test-object") + require.NoError(t, err) + assert.True(t, exists) + assert.Equal(t, 1, mock.existsCount) + }) + + t.Run("returns error when context cancelled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 1, 1, 20*time.Minute, readRateLimiter, reg), + } + + // Consume all burst tokens (burst = 2 * maxQPS = 2). + for range 2 { + _, err := bucket.Exists(context.Background(), "test") + require.NoError(t, err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := bucket.Exists(ctx, "test-object") + assert.ErrorIs(t, err, context.Canceled) + }) + }) + + t.Run("Attributes", func(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, readRateLimiter, reg), + } + + ctx := context.Background() + + attrs, err := bucket.Attributes(ctx, "test-object") + require.NoError(t, err) + assert.Equal(t, int64(100), attrs.Size) + assert.Equal(t, 1, mock.attributesCount) + }) + + t.Run("returns error when context cancelled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 1, 1, 20*time.Minute, readRateLimiter, reg), + } + + // Consume all burst tokens (burst = 2 * maxQPS = 2). + for range 2 { + _, err := bucket.Attributes(context.Background(), "test") + require.NoError(t, err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := bucket.Attributes(ctx, "test-object") + assert.ErrorIs(t, err, context.Canceled) + }) + }) + + t.Run("Iter", func(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, readRateLimiter, reg), + } + ctx := context.Background() + + err := bucket.Iter(ctx, "dir/", func(name string) error { return nil }) + require.NoError(t, err) + assert.Equal(t, 1, mock.iterCount) + }) + + t.Run("returns error when context cancelled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 1, 1, 20*time.Minute, readRateLimiter, reg), + } + + // Consume all burst tokens (burst = 2 * maxQPS = 2). + for range 2 { + err := bucket.Iter(context.Background(), "dir/", func(name string) error { return nil }) + require.NoError(t, err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err := bucket.Iter(ctx, "dir/", func(name string) error { return nil }) + assert.ErrorIs(t, err, context.Canceled) + }) + }) + + t.Run("IterWithAttributes", func(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, readRateLimiter, reg), + } + ctx := context.Background() + + err := bucket.IterWithAttributes(ctx, "dir/", func(attrs objstore.IterObjectAttributes) error { return nil }) + require.NoError(t, err) + assert.Equal(t, 1, mock.iterWithAttributesCount) + }) + + t.Run("returns error when context cancelled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + bucket := &rateLimitedBucket{ + Bucket: mock, + readLimiter: newRateLimiter("test", 1, 1, 20*time.Minute, readRateLimiter, reg), + } + + // Consume all burst tokens (burst = 2 * maxQPS = 2). + for range 2 { + err := bucket.IterWithAttributes(context.Background(), "dir/", func(attrs objstore.IterObjectAttributes) error { return nil }) + require.NoError(t, err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err := bucket.IterWithAttributes(ctx, "dir/", func(attrs objstore.IterObjectAttributes) error { return nil }) + assert.ErrorIs(t, err, context.Canceled) + }) + }) + + t.Run("Delete", func(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + bucket := &rateLimitedBucket{ + Bucket: mock, + uploadLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, uploadRateLimiter, reg), + } + ctx := context.Background() + + err := bucket.Delete(ctx, "test-object") + require.NoError(t, err) + assert.Equal(t, 1, mock.deleteCount) + }) + + t.Run("returns error when context cancelled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + bucket := &rateLimitedBucket{ + Bucket: mock, + uploadLimiter: newRateLimiter("test", 1, 1, 20*time.Minute, uploadRateLimiter, reg), + } + + // Consume all burst tokens (burst = 2 * maxQPS = 2). + for range 2 { + err := bucket.Delete(context.Background(), "test") + require.NoError(t, err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err := bucket.Delete(ctx, "test-object") + assert.ErrorIs(t, err, context.Canceled) + }) + }) + + t.Run("all operations succeed with both limiters enabled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + uploadLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, uploadRateLimiter, reg), + readLimiter: newRateLimiter("test", 20, 20, 20*time.Minute, readRateLimiter, reg), + } + + ctx := context.Background() + + // Upload should use upload limiter + err := bucket.Upload(ctx, "test-object", bytes.NewBufferString("data"), nil) + require.NoError(t, err) + assert.Equal(t, 1, mock.uploadCount) + + // Get should use read limiter + reader, err := bucket.Get(ctx, "test-object") + require.NoError(t, err) + _ = reader.Close() + assert.Equal(t, 1, mock.getCount) + + // GetRange should use read limiter + reader, err = bucket.GetRange(ctx, "test-object", 0, 100) + require.NoError(t, err) + _ = reader.Close() + assert.Equal(t, 1, mock.getRangeCount) + + exists, err := bucket.Exists(ctx, "test-object") + require.NoError(t, err) + assert.True(t, exists) + assert.Equal(t, 1, mock.existsCount) + + attrs, err := bucket.Attributes(ctx, "test-object") + require.NoError(t, err) + assert.Equal(t, int64(100), attrs.Size) + assert.Equal(t, 1, mock.attributesCount) + }) + + t.Run("reads bypass rate limiting when only upload limiter enabled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + uploadLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, uploadRateLimiter, reg), + // readLimiter is nil + } + + ctx := context.Background() + + // Get should work without rate limiting + reader, err := bucket.Get(ctx, "test-object") + require.NoError(t, err) + _ = reader.Close() + assert.Equal(t, 1, mock.getCount) + }) + + t.Run("uploads bypass rate limiting when only read limiter enabled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + mock := &mockBucket{} + + bucket := &rateLimitedBucket{ + Bucket: mock, + // uploadLimiter is nil + readLimiter: newRateLimiter("test", 10, 10, 20*time.Minute, readRateLimiter, reg), + } + + ctx := context.Background() + + // Upload should work without rate limiting + err := bucket.Upload(ctx, "test-object", bytes.NewBufferString("data"), nil) + require.NoError(t, err) + assert.Equal(t, 1, mock.uploadCount) + }) +} + +func TestConfig_Validate(t *testing.T) { + tests := []struct { + name string + cfg Config + expectedErr error + }{ + { + name: "valid config with upload rate limiting enabled", + cfg: Config{ + UploadRateLimitEnabled: true, + UploadInitialQPS: 500, + UploadMaxQPS: 1000, + UploadRampPeriod: 20 * time.Minute, + }, + }, + { + name: "valid config with read rate limiting enabled", + cfg: Config{ + ReadRateLimitEnabled: true, + ReadInitialQPS: 500, + ReadMaxQPS: 1000, + ReadRampPeriod: 20 * time.Minute, + }, + }, + { + name: "valid config with both types of rate limiting enabled", + cfg: Config{ + UploadRateLimitEnabled: true, + UploadInitialQPS: 500, + UploadMaxQPS: 1000, + UploadRampPeriod: 20 * time.Minute, + ReadRateLimitEnabled: true, + ReadInitialQPS: 1000, + ReadMaxQPS: 2000, + ReadRampPeriod: 20 * time.Minute, + }, + }, + { + name: "valid config with rate limiting disabled", + cfg: Config{ + UploadRateLimitEnabled: false, + UploadInitialQPS: 0, // Invalid value is OK when disabled. + UploadMaxQPS: 0, // Invalid value is OK when disabled. + UploadRampPeriod: 0, // Invalid value is OK when disabled. + ReadRateLimitEnabled: false, + ReadInitialQPS: 0, + ReadMaxQPS: 0, + ReadRampPeriod: 0, + }, + }, + { + name: "invalid config: upload rate limiting enabled with zero initial QPS", + cfg: Config{ + UploadRateLimitEnabled: true, + UploadInitialQPS: 0, + UploadMaxQPS: 1000, + UploadRampPeriod: 20 * time.Minute, + }, + expectedErr: errInvalidUploadInitialQPS, + }, + { + name: "invalid config: upload rate limiting enabled with negative initial QPS", + cfg: Config{ + UploadRateLimitEnabled: true, + UploadInitialQPS: -1, + UploadMaxQPS: 1000, + UploadRampPeriod: 20 * time.Minute, + }, + expectedErr: errInvalidUploadInitialQPS, + }, + { + name: "invalid config: upload rate limiting enabled with zero max QPS", + cfg: Config{ + UploadRateLimitEnabled: true, + UploadInitialQPS: 500, + UploadMaxQPS: 0, + UploadRampPeriod: 20 * time.Minute, + }, + expectedErr: errInvalidUploadMaxQPS, + }, + { + name: "invalid config: upload rate limiting enabled with negative max QPS", + cfg: Config{ + UploadRateLimitEnabled: true, + UploadInitialQPS: 500, + UploadMaxQPS: -1, + UploadRampPeriod: 20 * time.Minute, + }, + expectedErr: errInvalidUploadMaxQPS, + }, + { + name: "invalid config: upload rate limiting enabled with zero ramp period", + cfg: Config{ + UploadRateLimitEnabled: true, + UploadInitialQPS: 500, + UploadMaxQPS: 1000, + UploadRampPeriod: 0, + }, + expectedErr: errInvalidUploadRampPeriod, + }, + { + name: "invalid config: upload rate limiting enabled with negative ramp period", + cfg: Config{ + UploadRateLimitEnabled: true, + UploadInitialQPS: 500, + UploadMaxQPS: 1000, + UploadRampPeriod: -1 * time.Minute, + }, + expectedErr: errInvalidUploadRampPeriod, + }, + { + name: "invalid config: read rate limiting enabled with zero initial QPS", + cfg: Config{ + ReadRateLimitEnabled: true, + ReadInitialQPS: 0, + ReadMaxQPS: 1000, + ReadRampPeriod: 20 * time.Minute, + }, + expectedErr: errInvalidReadInitialQPS, + }, + { + name: "invalid config: read rate limiting enabled with negative initial QPS", + cfg: Config{ + ReadRateLimitEnabled: true, + ReadInitialQPS: -1, + ReadMaxQPS: 1000, + ReadRampPeriod: 20 * time.Minute, + }, + expectedErr: errInvalidReadInitialQPS, + }, + { + name: "invalid config: read rate limiting enabled with zero max QPS", + cfg: Config{ + ReadRateLimitEnabled: true, + ReadInitialQPS: 500, + ReadMaxQPS: 0, + ReadRampPeriod: 20 * time.Minute, + }, + expectedErr: errInvalidReadMaxQPS, + }, + { + name: "invalid config: read rate limiting enabled with negative max QPS", + cfg: Config{ + ReadRateLimitEnabled: true, + ReadInitialQPS: 500, + ReadMaxQPS: -1, + ReadRampPeriod: 20 * time.Minute, + }, + expectedErr: errInvalidReadMaxQPS, + }, + { + name: "invalid config: read rate limiting enabled with zero ramp period", + cfg: Config{ + ReadRateLimitEnabled: true, + ReadInitialQPS: 500, + ReadMaxQPS: 1000, + ReadRampPeriod: 0, + }, + expectedErr: errInvalidReadRampPeriod, + }, + { + name: "invalid config: read rate limiting enabled with negative ramp period", + cfg: Config{ + ReadRateLimitEnabled: true, + ReadInitialQPS: 500, + ReadMaxQPS: 1000, + ReadRampPeriod: -1 * time.Minute, + }, + expectedErr: errInvalidReadRampPeriod, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + if tc.expectedErr != nil { + assert.ErrorIs(t, err, tc.expectedErr) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestIsRateLimitError(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "generic error", + err: errors.New("some error"), + expected: false, + }, + { + name: "googleapi 429 error", + err: &googleapi.Error{Code: 429}, + expected: true, + }, + { + name: "googleapi 500 error", + err: &googleapi.Error{Code: 500}, + expected: false, + }, + { + name: "googleapi 404 error", + err: &googleapi.Error{Code: 404}, + expected: false, + }, + { + name: "wrapped googleapi 429 error", + err: fmt.Errorf("operation failed: %w", &googleapi.Error{Code: 429}), + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := isRateLimitError(tc.err) + assert.Equal(t, tc.expected, result) + }) + } +} diff --git a/pkg/storage/bucket/gcs/config.go b/pkg/storage/bucket/gcs/config.go index 9a7440aea88..0b17413e896 100644 --- a/pkg/storage/bucket/gcs/config.go +++ b/pkg/storage/bucket/gcs/config.go @@ -6,13 +6,24 @@ package gcs import ( + "errors" "flag" + "time" "github.com/grafana/dskit/flagext" "github.com/grafana/mimir/pkg/storage/bucket/common" ) +var ( + errInvalidUploadInitialQPS = errors.New("gcs.upload-initial-qps must be greater than 0 when rate limiting is enabled") + errInvalidUploadMaxQPS = errors.New("gcs.upload-max-qps must be greater than 0 when rate limiting is enabled") + errInvalidUploadRampPeriod = errors.New("gcs.upload-ramp-period must be greater than 0 when rate limiting is enabled") + errInvalidReadInitialQPS = errors.New("gcs.read-initial-qps must be greater than 0 when rate limiting is enabled") + errInvalidReadMaxQPS = errors.New("gcs.read-max-qps must be greater than 0 when rate limiting is enabled") + errInvalidReadRampPeriod = errors.New("gcs.read-ramp-period must be greater than 0 when rate limiting is enabled") +) + // Config holds the config options for GCS backend type Config struct { BucketName string `yaml:"bucket_name"` @@ -29,6 +40,40 @@ type Config struct { // Set to 1 to disable retries. MaxRetries int `yaml:"max_retries" category:"advanced"` + // UploadRateLimitEnabled enables rate limiting for GCS uploads. + // When enabled, uploads gradually ramp up to UploadMaxQPS following + // Google Cloud Storage best practices for request rate ramping. + UploadRateLimitEnabled bool `yaml:"upload_rate_limit_enabled" category:"advanced"` + + // UploadInitialQPS is the initial queries per second limit for GCS uploads + // when rate limiting is enabled. The rate will double every UploadRampPeriod + // until it reaches UploadMaxQPS. + UploadInitialQPS int `yaml:"upload_initial_qps" category:"advanced"` + + // UploadMaxQPS is the maximum queries per second limit for GCS uploads. + UploadMaxQPS int `yaml:"upload_max_qps" category:"advanced"` + + // UploadRampPeriod is the time period over which the upload rate doubles. + // Following the Google recommendation, this defaults to 20 minutes. + UploadRampPeriod time.Duration `yaml:"upload_ramp_period" category:"advanced"` + + // ReadRateLimitEnabled enables rate limiting for GCS reads. + // When enabled, reads gradually ramp up to ReadMaxQPS following + // Google Cloud Storage best practices for request rate ramping. + ReadRateLimitEnabled bool `yaml:"read_rate_limit_enabled" category:"advanced"` + + // ReadInitialQPS is the initial queries per second limit for GCS reads + // when rate limiting is enabled. The rate will double every ReadRampPeriod + // until it reaches ReadMaxQPS. + ReadInitialQPS int `yaml:"read_initial_qps" category:"advanced"` + + // ReadMaxQPS is the maximum queries per second limit for GCS reads. + ReadMaxQPS int `yaml:"read_max_qps" category:"advanced"` + + // ReadRampPeriod is the time period over which the read rate doubles. + // Following the Google recommendation, this defaults to 20 minutes. + ReadRampPeriod time.Duration `yaml:"read_ramp_period" category:"advanced"` + HTTP common.HTTPConfig `yaml:"http"` } @@ -44,6 +89,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Var(&cfg.ServiceAccount, prefix+"gcs.service-account", cfg.GCSServiceAccountShortDescription()) f.BoolVar(&cfg.EnableUploadRetries, prefix+"gcs.enable-upload-retries", false, "Enable automatic retries for GCS uploads using the RetryAlways policy. Uploads will be retried on transient errors. Note: this does not guarantee idempotency.") f.IntVar(&cfg.MaxRetries, prefix+"gcs.max-retries", 20, "Maximum number of attempts for GCS operations (0 = unlimited, 1 = no retries). Applies to both regular and upload retry modes.") + f.BoolVar(&cfg.UploadRateLimitEnabled, prefix+"gcs.upload-rate-limit-enabled", false, "Enable rate limiting for GCS uploads. When enabled, uploads gradually ramp up following Google Cloud Storage best practices.") + f.IntVar(&cfg.UploadInitialQPS, prefix+"gcs.upload-initial-qps", 1000, "Initial queries per second limit for GCS uploads. The rate doubles every ramp period until it reaches the maximum.") + f.IntVar(&cfg.UploadMaxQPS, prefix+"gcs.upload-max-qps", 3200, "Maximum queries per second limit for GCS uploads.") + f.DurationVar(&cfg.UploadRampPeriod, prefix+"gcs.upload-ramp-period", 20*time.Minute, "Time period over which the upload rate doubles, following the Google recommendation.") + f.BoolVar(&cfg.ReadRateLimitEnabled, prefix+"gcs.read-rate-limit-enabled", false, "Enable rate limiting for GCS reads. When enabled, reads gradually ramp up following Google Cloud Storage best practices.") + f.IntVar(&cfg.ReadInitialQPS, prefix+"gcs.read-initial-qps", 5000, "Initial queries per second limit for GCS reads. The rate doubles every ramp period until it reaches the maximum.") + f.IntVar(&cfg.ReadMaxQPS, prefix+"gcs.read-max-qps", 16000, "Maximum queries per second limit for GCS reads.") + f.DurationVar(&cfg.ReadRampPeriod, prefix+"gcs.read-ramp-period", 20*time.Minute, "Time period over which the read rate doubles, following the Google recommendation.") cfg.HTTP.RegisterFlagsWithPrefix(prefix+"gcs.", f) } @@ -58,3 +111,30 @@ func (cfg *Config) GCSServiceAccountLongDescription() string { "\n2. A JSON file in a location known to the gcloud command-line tool: $HOME/.config/gcloud/application_default_credentials.json." + "\n3. On Google Compute Engine it fetches credentials from the metadata server." } + +// Validate validates the GCS config and returns an error on failure. +func (cfg *Config) Validate() error { + if cfg.UploadRateLimitEnabled { + if cfg.UploadInitialQPS <= 0 { + return errInvalidUploadInitialQPS + } + if cfg.UploadMaxQPS <= 0 { + return errInvalidUploadMaxQPS + } + if cfg.UploadRampPeriod <= 0 { + return errInvalidUploadRampPeriod + } + } + if cfg.ReadRateLimitEnabled { + if cfg.ReadInitialQPS <= 0 { + return errInvalidReadInitialQPS + } + if cfg.ReadMaxQPS <= 0 { + return errInvalidReadMaxQPS + } + if cfg.ReadRampPeriod <= 0 { + return errInvalidReadRampPeriod + } + } + return nil +} diff --git a/pkg/storage/bucket/gcs/rate_limiter.go b/pkg/storage/bucket/gcs/rate_limiter.go new file mode 100644 index 00000000000..1c95c50aef8 --- /dev/null +++ b/pkg/storage/bucket/gcs/rate_limiter.go @@ -0,0 +1,210 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package gcs + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/time/rate" +) + +type rateLimiterMode int + +const ( + uploadRateLimiter rateLimiterMode = iota + readRateLimiter +) + +const ( + // backoffCooldown is the minimum time between successive backoffs to prevent + // oscillation when multiple concurrent requests hit rate limits. + backoffCooldown = 5 * time.Second +) + +// rateLimiter implements request rate limiting with exponential doubling +// following Google Cloud Storage best practices for ramping up request rates. +// See: https://cloud.google.com/storage/docs/request-rate#ramp-up. +// +// The rate limiter also supports adaptive backoff: when GCS returns a 429 +// rate limit error, the rate is immediately halved to reduce pressure. +type rateLimiter struct { + initialQPS int + maxQPS int + rampPeriod time.Duration + + mu sync.Mutex + startTime time.Time + currentQPS int + limiter *rate.Limiter + lastBackoff time.Time // Time of last backoff to prevent rapid successive backoffs + + rateLimitedSeconds prometheus.Counter + currentQPSGauge prometheus.Gauge + requestsTotal *prometheus.CounterVec + backoffsTotal prometheus.Counter +} + +// newRateLimiter creates a new rate limiter with exponential doubling. +// The rate starts at initialQPS (capped by maxQPS) and doubles every rampPeriod until it reaches maxQPS. +// The mode parameter configures whether to rate limit uploads or reads (used for metrics labels). +// If reg is nil, no metrics will be registered. +func newRateLimiter(name string, initialQPS, maxQPS int, rampPeriod time.Duration, mode rateLimiterMode, reg prometheus.Registerer) *rateLimiter { + var operation string + switch mode { + case uploadRateLimiter: + operation = "upload" + case readRateLimiter: + operation = "read" + default: + panic(fmt.Errorf("unrecognized rateLimiterMode %v", mode)) + } + startQPS := min(initialQPS, maxQPS) + rl := &rateLimiter{ + initialQPS: initialQPS, + maxQPS: maxQPS, + rampPeriod: rampPeriod, + startTime: time.Now(), + currentQPS: startQPS, + limiter: rate.NewLimiter(rate.Limit(startQPS), startQPS*2), // Burst = 2 seconds worth of requests. + } + + if reg != nil { + constLabels := prometheus.Labels{"name": name, "operation": operation} + rl.rateLimitedSeconds = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_gcs_rate_limited_seconds_total", + Help: "Total seconds spent waiting due to GCS rate limiting.", + ConstLabels: constLabels, + }) + rl.currentQPSGauge = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_gcs_current_qps", + Help: "Current queries per second limit for GCS operations.", + ConstLabels: constLabels, + }) + rl.requestsTotal = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_gcs_requests_total", + Help: "Total GCS requests, labeled by whether they were allowed immediately or rate limited.", + ConstLabels: constLabels, + }, []string{"allowed"}) + rl.backoffsTotal = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_gcs_rate_limit_backoffs_total", + Help: "Total number of backoffs due to GCS 429 rate limit errors.", + ConstLabels: constLabels, + }) + rl.currentQPSGauge.Set(float64(startQPS)) + } + + return rl +} + +// Wait blocks until the rate limiter allows a request to proceed. +// It implements exponential doubling of the rate over time. +func (rl *rateLimiter) Wait(ctx context.Context) error { + rl.mu.Lock() + rl.maybeUpdateRate() + rl.mu.Unlock() + + r := rl.limiter.Reserve() + delay := r.Delay() + + if delay == 0 { + if rl.requestsTotal != nil { + rl.requestsTotal.WithLabelValues("true").Inc() + } + return nil + } + + if rl.requestsTotal != nil { + rl.requestsTotal.WithLabelValues("false").Inc() + } + if rl.rateLimitedSeconds != nil { + rl.rateLimitedSeconds.Add(delay.Seconds()) + } + + select { + case <-ctx.Done(): + r.Cancel() + return ctx.Err() + case <-time.After(delay): + return nil + } +} + +// maybeUpdateRate updates the rate limit based on exponential doubling. +// Has to be called with rl.mu lock taken. +func (rl *rateLimiter) maybeUpdateRate() { + elapsed := time.Since(rl.startTime) + periodsElapsed := int(elapsed / rl.rampPeriod) + + // Calculate new QPS: initialQPS * 2^periodsElapsed, capped at maxQPS. + newQPS := min(rl.initialQPS, rl.maxQPS) + for range periodsElapsed { + newQPS *= 2 + if newQPS >= rl.maxQPS { + newQPS = rl.maxQPS + break + } + } + + if newQPS != rl.currentQPS { + rl.currentQPS = newQPS + rl.limiter.SetLimit(rate.Limit(newQPS)) + rl.limiter.SetBurst(newQPS * 2) + if rl.currentQPSGauge != nil { + rl.currentQPSGauge.Set(float64(newQPS)) + } + } +} + +// getCurrentQPS returns the current QPS limit for observability. +func (rl *rateLimiter) getCurrentQPS() int { + rl.mu.Lock() + defer rl.mu.Unlock() + rl.maybeUpdateRate() + return rl.currentQPS +} + +// Backoff reduces the rate limit in response to a 429 rate limit error from GCS. +// It halves the current QPS and resets the ramp-up start time so that the rate +// will gradually increase again from the new lower value. +// +// To prevent oscillation when multiple concurrent requests hit rate limits, +// backoff is only applied if at least backoffCooldown has passed since the last backoff. +func (rl *rateLimiter) Backoff() { + rl.mu.Lock() + defer rl.mu.Unlock() + + now := time.Now() + if now.Sub(rl.lastBackoff) < backoffCooldown { + // Too soon since last backoff, skip to avoid oscillation. + return + } + + // Halve the current QPS, but don't go below 1. + newQPS := max(rl.currentQPS/2, 1) + if newQPS == rl.currentQPS { + // Already at minimum, nothing to do. + return + } + + rl.currentQPS = newQPS + rl.limiter.SetLimit(rate.Limit(newQPS)) + rl.limiter.SetBurst(newQPS * 2) + + // Reset start time so ramp-up begins from this new lower rate. + // We set initialQPS to the new backed-off rate so doubling starts from here. + rl.initialQPS = newQPS + rl.startTime = now + rl.lastBackoff = now + + if rl.currentQPSGauge != nil { + rl.currentQPSGauge.Set(float64(newQPS)) + } + if rl.backoffsTotal != nil { + rl.backoffsTotal.Inc() + } +} diff --git a/pkg/storage/bucket/gcs/rate_limiter_test.go b/pkg/storage/bucket/gcs/rate_limiter_test.go new file mode 100644 index 00000000000..e9b0dd86aed --- /dev/null +++ b/pkg/storage/bucket/gcs/rate_limiter_test.go @@ -0,0 +1,331 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package gcs + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRateLimiter(t *testing.T) { + t.Run("starts at initial QPS capped by max", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + limiter := newRateLimiter("test", 1000, 3200, 20*time.Minute, uploadRateLimiter, reg) + + assert.Equal(t, 1000, limiter.getCurrentQPS()) + }) + + t.Run("doubles QPS each ramp period until max", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + // Use short ramp period for testing. + limiter := newRateLimiter("test", 1000, 8000, 100*time.Millisecond, uploadRateLimiter, reg) + + assert.Equal(t, 1000, limiter.getCurrentQPS()) + + // After one ramp period, should double. + time.Sleep(110 * time.Millisecond) + assert.Equal(t, 2000, limiter.getCurrentQPS()) + + // After two ramp periods, should double again. + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 4000, limiter.getCurrentQPS()) + + // After three ramp periods, should double again and hit cap. + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 8000, limiter.getCurrentQPS()) + + // After four ramp periods, should still be capped at maxQPS. + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 8000, limiter.getCurrentQPS()) + }) + + t.Run("allows requests up to burst limit immediately", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + // maxQPS below initialQPS, so it will be capped at maxQPS=500. + limiter := newRateLimiter("test", 1000, 500, 20*time.Minute, uploadRateLimiter, reg) + + ctx := context.Background() + start := time.Now() + + // Should be able to consume initial burst quickly (burst size = 2 * currentQPS = 1000). + for range 500 { + err := limiter.Wait(ctx) + require.NoError(t, err) + } + + elapsed := time.Since(start) + // Should complete in under 100ms (most tokens were pre-allocated). + assert.Less(t, elapsed, 100*time.Millisecond) + }) + + t.Run("blocks when burst is exhausted", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + // Low QPS to test blocking (will cap initialQPS=1000 to maxQPS=10). + limiter := newRateLimiter("test", 1000, 10, 20*time.Minute, uploadRateLimiter, reg) + + ctx := context.Background() + + // Consume all initial burst tokens (burst = 2 * 10 = 20). + for range 20 { + err := limiter.Wait(ctx) + require.NoError(t, err) + } + + // Next request should block. + start := time.Now() + err := limiter.Wait(ctx) + require.NoError(t, err) + elapsed := time.Since(start) + + // Should have waited approximately 1/10th of a second (1 token at 10 QPS). + assert.Greater(t, elapsed, 80*time.Millisecond) + assert.Less(t, elapsed, 150*time.Millisecond) + }) + + t.Run("returns error when context cancelled", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + // Very low QPS to ensure blocking (will cap initialQPS=1000 to maxQPS=1). + limiter := newRateLimiter("test", 1000, 1, 20*time.Minute, uploadRateLimiter, reg) + + // Consume all initial burst tokens (burst = 2 * 1 = 2). + for range 2 { + err := limiter.Wait(context.Background()) + require.NoError(t, err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := limiter.Wait(ctx) + assert.ErrorIs(t, err, context.Canceled) + }) + + t.Run("records metrics for allowed and limited requests", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + // maxQPS=100 caps initialQPS=1000 down to 100. + limiter := newRateLimiter("test", 1000, 100, 20*time.Minute, uploadRateLimiter, reg) + + ctx := context.Background() + + // Make some requests that should be allowed immediately. + for range 5 { + err := limiter.Wait(ctx) + require.NoError(t, err) + } + + assert.Equal(t, 5.0, testutil.ToFloat64(limiter.requestsTotal.WithLabelValues("true"))) + assert.Equal(t, 0.0, testutil.ToFloat64(limiter.requestsTotal.WithLabelValues("false"))) + + assert.Equal(t, 100.0, testutil.ToFloat64(limiter.currentQPSGauge)) + + // Consume remaining burst tokens to trigger rate limiting (burst = 200, already used 5). + for range 195 { + _ = limiter.Wait(ctx) + } + + err := limiter.Wait(ctx) + require.NoError(t, err) + // Should have at least one rate-limited request now. + assert.GreaterOrEqual(t, testutil.ToFloat64(limiter.requestsTotal.WithLabelValues("false")), 1.0) + assert.Greater(t, testutil.ToFloat64(limiter.rateLimitedSeconds), 0.0) + }) + + t.Run("is safe for concurrent use", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + // maxQPS=50 caps initialQPS=1000 down to 50. + limiter := newRateLimiter("test", 1000, 50, 20*time.Minute, uploadRateLimiter, reg) + + ctx := context.Background() + numGoroutines := 100 + + start := time.Now() + + var wg sync.WaitGroup + for range numGoroutines { + wg.Add(1) + go func() { + defer wg.Done() + err := limiter.Wait(ctx) + require.NoError(t, err) + }() + } + wg.Wait() + + elapsed := time.Since(start) + + // With 50 QPS and 100 requests: + // - First 100 requests use initial token bucket (2 seconds worth = 100 tokens) + // - Should complete quickly since burst covers all requests. + assert.Less(t, elapsed, 3*time.Second) + + // Verify all requests were processed. + totalRequests := testutil.ToFloat64(limiter.requestsTotal.WithLabelValues("true")) + + testutil.ToFloat64(limiter.requestsTotal.WithLabelValues("false")) + assert.Equal(t, float64(numGoroutines), totalRequests) + }) + + t.Run("upload and read limiters have separate metrics", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + + // Create rate limiters for different operations. + uploadLimiter := newRateLimiter("test", 100, 100, 20*time.Minute, uploadRateLimiter, reg) + readLimiter := newRateLimiter("test", 200, 200, 20*time.Minute, readRateLimiter, reg) + + ctx := context.Background() + + // Use both limiters. + require.NoError(t, uploadLimiter.Wait(ctx)) + require.NoError(t, readLimiter.Wait(ctx)) + + // Verify separate metrics. + assert.Equal(t, 1.0, testutil.ToFloat64(uploadLimiter.requestsTotal.WithLabelValues("true"))) + assert.Equal(t, 1.0, testutil.ToFloat64(readLimiter.requestsTotal.WithLabelValues("true"))) + assert.Equal(t, 100.0, testutil.ToFloat64(uploadLimiter.currentQPSGauge)) + assert.Equal(t, 200.0, testutil.ToFloat64(readLimiter.currentQPSGauge)) + }) + + t.Run("works without metrics when registerer is nil", func(t *testing.T) { + limiter := newRateLimiter("test", 100, 100, 20*time.Minute, uploadRateLimiter, nil) + + // Verify rate limiter works without metrics. + ctx := context.Background() + err := limiter.Wait(ctx) + require.NoError(t, err) + assert.Nil(t, limiter.requestsTotal) + assert.Nil(t, limiter.currentQPSGauge) + assert.Nil(t, limiter.rateLimitedSeconds) + + // Verify rate limiting still works. + assert.Equal(t, 100, limiter.getCurrentQPS()) + }) + + t.Run("rate limits without panic when registerer is nil", func(t *testing.T) { + // Test that rate limiting works correctly even with nil registerer + // when we need to wait (i.e., exceed the burst). + limiter := newRateLimiter("test", 1000, 10, 20*time.Minute, uploadRateLimiter, nil) + + ctx := context.Background() + + // Consume all burst tokens (burst = 2 * 10 = 20). + for range 20 { + err := limiter.Wait(ctx) + require.NoError(t, err) + } + + // Next request should block but not panic due to nil metrics. + start := time.Now() + err := limiter.Wait(ctx) + require.NoError(t, err) + elapsed := time.Since(start) + + // Should have waited approximately 1/10th of a second (1 token at 10 QPS). + assert.Greater(t, elapsed, 80*time.Millisecond) + }) + + t.Run("respects custom initial QPS", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + // Use a custom initial QPS of 50 with max of 200. + limiter := newRateLimiter("test", 50, 200, 100*time.Millisecond, uploadRateLimiter, reg) + + assert.Equal(t, 50, limiter.getCurrentQPS()) + + // After one ramp period, should double to 100. + time.Sleep(110 * time.Millisecond) + assert.Equal(t, 100, limiter.getCurrentQPS()) + + // After two ramp periods, should double to 200 (max). + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 200, limiter.getCurrentQPS()) + }) + + t.Run("backoff halves QPS", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + limiter := newRateLimiter("test", 100, 100, 20*time.Minute, uploadRateLimiter, reg) + + assert.Equal(t, 100, limiter.getCurrentQPS()) + + limiter.Backoff() + assert.Equal(t, 50, limiter.getCurrentQPS()) + + // Verify metric was incremented. + assert.Equal(t, 1.0, testutil.ToFloat64(limiter.backoffsTotal)) + assert.Equal(t, 50.0, testutil.ToFloat64(limiter.currentQPSGauge)) + }) + + t.Run("backoff respects cooldown", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + limiter := newRateLimiter("test", 100, 100, 20*time.Minute, uploadRateLimiter, reg) + + assert.Equal(t, 100, limiter.getCurrentQPS()) + + // First backoff should work. + limiter.Backoff() + assert.Equal(t, 50, limiter.getCurrentQPS()) + + // Second immediate backoff should be ignored due to cooldown. + limiter.Backoff() + assert.Equal(t, 50, limiter.getCurrentQPS()) + + // Only one backoff should have been recorded. + assert.Equal(t, 1.0, testutil.ToFloat64(limiter.backoffsTotal)) + }) + + t.Run("backoff does not go below 1 QPS", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + limiter := newRateLimiter("test", 2, 2, 20*time.Minute, uploadRateLimiter, reg) + + assert.Equal(t, 2, limiter.getCurrentQPS()) + + // First backoff: 2 -> 1. + limiter.Backoff() + assert.Equal(t, 1, limiter.getCurrentQPS()) + + // Manually reset lastBackoff to allow another backoff. + limiter.mu.Lock() + limiter.lastBackoff = time.Time{} + limiter.mu.Unlock() + + // Second backoff should not go below 1. + limiter.Backoff() + assert.Equal(t, 1, limiter.getCurrentQPS()) + + // Only one backoff should have been recorded (second was a no-op). + assert.Equal(t, 1.0, testutil.ToFloat64(limiter.backoffsTotal)) + }) + + t.Run("backoff restarts ramp-up from backed-off rate", func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + limiter := newRateLimiter("test", 100, 200, 50*time.Millisecond, uploadRateLimiter, reg) + + assert.Equal(t, 100, limiter.getCurrentQPS()) + + // Let it ramp up to 200. + time.Sleep(60 * time.Millisecond) + assert.Equal(t, 200, limiter.getCurrentQPS()) + + // Backoff: 200 -> 100. + limiter.Backoff() + assert.Equal(t, 100, limiter.getCurrentQPS()) + + // After one ramp period, should double to 200 again. + time.Sleep(60 * time.Millisecond) + assert.Equal(t, 200, limiter.getCurrentQPS()) + }) + + t.Run("backoff works without metrics when registerer is nil", func(t *testing.T) { + limiter := newRateLimiter("test", 100, 100, 20*time.Minute, uploadRateLimiter, nil) + + assert.Equal(t, 100, limiter.getCurrentQPS()) + + // Should not panic. + limiter.Backoff() + assert.Equal(t, 50, limiter.getCurrentQPS()) + }) +}