@@ -211,8 +211,10 @@ func (client *DataSetClient) isProcessingEvents() bool {
211211 return client .eventsEnqueued .Load () > client .eventsProcessed .Load ()
212212}
213213
214- // Shutdown stops processing of new events and waits until all the events that are
215- // being processed are really processed (sent to DataSet).
214+ // Shutdown takes care of shutdown of client. It does following steps
215+ // - stops processing of new events,
216+ // - tries (with 1st half of shutdownMaxTimeout period) to process (add into buffers) all the events,
217+ // - tries (with 2nd half of shutdownMaxTimeout period) to send processed events (buffers) into DataSet
216218func (client * DataSetClient ) Shutdown () error {
217219 client .Logger .Info ("Shutting down - BEGIN" )
218220 // mark as finished to prevent processing of further events
@@ -222,33 +224,25 @@ func (client *DataSetClient) Shutdown() error {
222224 client .logStatistics ()
223225
224226 var lastError error = nil
227+ shutdownTimeout := minDuration (client .Config .BufferSettings .RetryMaxElapsedTime , client .Config .BufferSettings .RetryShutdownTimeout )
225228 expBackoff := backoff.ExponentialBackOff {
226229 InitialInterval : client .Config .BufferSettings .RetryInitialInterval ,
227230 RandomizationFactor : client .Config .BufferSettings .RetryRandomizationFactor ,
228231 Multiplier : client .Config .BufferSettings .RetryMultiplier ,
229232 MaxInterval : client .Config .BufferSettings .RetryMaxInterval ,
230- MaxElapsedTime : client . Config . BufferSettings . RetryMaxElapsedTime ,
233+ MaxElapsedTime : shutdownTimeout / 2 ,
231234 Stop : backoff .Stop ,
232235 Clock : backoff .SystemClock ,
233236 }
234237 expBackoff .Reset ()
235238
236- // first we wait until all the events in buffers are added into buffers
237- // then we are waiting until all the buffers are processed
238- // if some progress is made we restart the waiting times
239-
240- // do wait for all events to be processed
239+ // try (with timeout) to process (add into buffers) events,
241240 retryNum := 0
242- lastProcessed := client . eventsProcessed . Load ()
241+ processingStart := time . Now ()
243242 for client .isProcessingEvents () {
244243 // log statistics
245244 client .logStatistics ()
246245
247- // if some events were processed restart retry interval
248- if client .eventsProcessed .Load () != lastProcessed {
249- expBackoff .Reset ()
250- }
251- lastProcessed = client .eventsProcessed .Load ()
252246 backoffDelay := expBackoff .NextBackOff ()
253247 client .Logger .Info (
254248 "Shutting down - processing events" ,
@@ -279,22 +273,26 @@ func (client *DataSetClient) Shutdown() error {
279273 client .Logger .Info ("Shutting down - publishing all buffers" )
280274 client .publishAllBuffers ()
281275
282- // do wait for all buffers to be processed
276+ // reinitialize expBackoff with MaxElapsedTime based on actually elapsed time of processing (previous phase)
277+ processingElapsed := time .Since (processingStart )
278+ remainingShutdownTimeout := maxDuration (shutdownTimeout - processingElapsed , shutdownTimeout / 2 )
279+ expBackoff = backoff.ExponentialBackOff {
280+ InitialInterval : client .Config .BufferSettings .RetryInitialInterval ,
281+ RandomizationFactor : client .Config .BufferSettings .RetryRandomizationFactor ,
282+ Multiplier : client .Config .BufferSettings .RetryMultiplier ,
283+ MaxInterval : client .Config .BufferSettings .RetryMaxInterval ,
284+ MaxElapsedTime : remainingShutdownTimeout ,
285+ Stop : backoff .Stop ,
286+ Clock : backoff .SystemClock ,
287+ }
288+ // do wait (with timeout) for all buffers to be sent to the server
283289 retryNum = 0
284290 expBackoff .Reset ()
285- lastProcessed = client .buffersProcessed .Load ()
286- lastDropped := client .buffersDropped .Load ()
287- initialDropped := lastDropped
291+ initialDropped := client .buffersDropped .Load ()
288292 for client .isProcessingBuffers () {
289293 // log statistics
290294 client .logStatistics ()
291295
292- // if some buffers were processed restart retry interval
293- if client .buffersProcessed .Load ()+ lastDropped != lastProcessed + client .buffersDropped .Load () {
294- expBackoff .Reset ()
295- }
296- lastProcessed = client .buffersProcessed .Load ()
297- lastDropped = client .buffersDropped .Load ()
298296 backoffDelay := expBackoff .NextBackOff ()
299297 client .Logger .Info (
300298 "Shutting down - processing buffers" ,
@@ -476,3 +474,17 @@ func truncateText(text string, length int) string {
476474
477475 return text
478476}
477+
478+ func minDuration (a , b time.Duration ) time.Duration {
479+ if a <= b {
480+ return a
481+ }
482+ return b
483+ }
484+
485+ func maxDuration (a , b time.Duration ) time.Duration {
486+ if a >= b {
487+ return a
488+ }
489+ return b
490+ }
0 commit comments