@@ -16,6 +16,7 @@ package bq
1616
1717import (
1818 "log"
19+ "math/rand"
1920 "os"
2021 "strings"
2122 "sync"
@@ -219,6 +220,38 @@ func (in *BQInserter) Flush() error {
219220 // enough to stay within quota. It may result in AppEngine reducing the
220221 // number of workers, but that is fine - it will also result in staying
221222 // under the quota.
223+ // Analysis:
224+ // We can handle a minimum of 10 inserts per second, because
225+ // the default quota is 100MB/sec, and the limit on requests
226+ // is 10MB per request. Since generally inserts are smaller,
227+ // the typical number is more like 20 inserts/sec.
228+ // The net effect we need to see is that, if the pipeline capacity
229+ // exceeds the quota by 10%, then the pipeline needs to slow down
230+ // by roughly 10% to fit within the quota. The incoming request
231+ // rate is dictated by the task queue, and ultimately the handler
232+ // must reject 10% of the incoming requests. This only happens
233+ // when 10% of the instances have hit MAX_WORKERS.
234+ // If the capacity of the pipeline is, e.g., 2X the task queue rate,
235+ // then each task will need to be slowed down to the point that it
236+ // takes roughly 2.2X longer than it would without any Quota exceeded
237+ // errors. For NDT, the 100MB tasks require about 35 concurrent tasks
238+ // to process 60 tasks/min, indicating that they require about 35
239+ // seconds per task. There are about 70 tests/task, so this is about
240+ // 7 buffer flushes per second (of 10 tests each), or on average, about
241+ // one buffer flush every 5 seconds for each task.
242+ // The batch job might have 50 instances, and process 900 tasks
243+ // concurrently. If this had to be scaled back to 50%, the tasks
244+ // would have to spend 50% of their time sleeping between Put requests.
245+ // Since each task typically takes about 35 seconds, each task would
246+ // on average experience just over one 'Quota exceeded' error in order
247+ // to slow the pipeline down by 50%.
248+ // Note that a secondary effect is to reduce the CPU utilization, which
249+ // will likely trigger a reduction in the number of instances running.
250+ // Under these conditions, AppEngine would reduce the number of instances
251+ // until the target utilization is reaches, reducing the number of
252+ // concurrent tasks, and thus the frequency at which the tasks would
253+ // experience 'Quota error' events.
254+
222255 var err error
223256 for i := 0 ; i < 10 ; i ++ {
224257 // This is heavyweight, and may run forever without a context deadline.
@@ -229,7 +262,8 @@ func (in *BQInserter) Flush() error {
229262 break
230263 }
231264 metrics .WarningCount .WithLabelValues (in .TableBase (), "" , "Quota Exceeded" ).Inc ()
232- time .Sleep (in .params .RetryDelay )
265+ // Use some randomness to reduce risk of synchronization across tasks.
266+ time .Sleep (time .Duration ((0.5 + rand .Float64 ()) * in .params .RetryDelay .Seconds ()))
233267 }
234268
235269 // If there is still an error, then handle it.
0 commit comments