@@ -217,28 +217,38 @@ func (client *DataSetClient) isProcessingEvents() bool {
217217// - tries (with 2nd half of shutdownMaxTimeout period) to send processed events (buffers) into DataSet
218218func (client * DataSetClient ) Shutdown () error {
219219 client .Logger .Info ("Shutting down - BEGIN" )
220+ // start measuring processing time
221+ processingStart := time .Now ()
222+
220223 // mark as finished to prevent processing of further events
221224 client .finished .Store (true )
222225
223226 // log statistics when finish was called
224227 client .logStatistics ()
225228
229+ retryShutdownTimeout := client .Config .BufferSettings .RetryShutdownTimeout
230+ maxElapsedTime := retryShutdownTimeout / 2 - 100 * time .Millisecond
231+ client .Logger .Info (
232+ "Shutting down - waiting for events" ,
233+ zap .Duration ("maxElapsedTime" , maxElapsedTime ),
234+ zap .Duration ("retryShutdownTimeout" , retryShutdownTimeout ),
235+ zap .Duration ("elapsedTime" , time .Since (processingStart )),
236+ )
237+
226238 var lastError error = nil
227- shutdownTimeout := minDuration (client .Config .BufferSettings .RetryMaxElapsedTime , client .Config .BufferSettings .RetryShutdownTimeout )
228239 expBackoff := backoff.ExponentialBackOff {
229240 InitialInterval : client .Config .BufferSettings .RetryInitialInterval ,
230241 RandomizationFactor : client .Config .BufferSettings .RetryRandomizationFactor ,
231242 Multiplier : client .Config .BufferSettings .RetryMultiplier ,
232243 MaxInterval : client .Config .BufferSettings .RetryMaxInterval ,
233- MaxElapsedTime : shutdownTimeout / 2 ,
244+ MaxElapsedTime : maxElapsedTime ,
234245 Stop : backoff .Stop ,
235246 Clock : backoff .SystemClock ,
236247 }
237248 expBackoff .Reset ()
238249
239250 // try (with timeout) to process (add into buffers) events,
240251 retryNum := 0
241- processingStart := time .Now ()
242252 for client .isProcessingEvents () {
243253 // log statistics
244254 client .logStatistics ()
@@ -250,38 +260,40 @@ func (client *DataSetClient) Shutdown() error {
250260 zap .Duration ("backoffDelay" , backoffDelay ),
251261 zap .Uint64 ("eventsEnqueued" , client .eventsEnqueued .Load ()),
252262 zap .Uint64 ("eventsProcessed" , client .eventsProcessed .Load ()),
263+ zap .Duration ("elapsedTime" , time .Since (processingStart )),
264+ zap .Duration ("maxElapsedTime" , maxElapsedTime ),
253265 )
254266 if backoffDelay == expBackoff .Stop {
255- lastError = fmt .Errorf (
256- "not all events have been processed - %d" ,
257- client .eventsEnqueued .Load ()- client .eventsProcessed .Load (),
258- )
259- client .Logger .Error (
260- "Shutting down - not all events have been processed" ,
261- zap .Int ("retryNum" , retryNum ),
262- zap .Duration ("backoffDelay" , backoffDelay ),
263- zap .Uint64 ("eventsEnqueued" , client .eventsEnqueued .Load ()),
264- zap .Uint64 ("eventsProcessed" , client .eventsProcessed .Load ()),
265- )
266267 break
267268 }
268269 time .Sleep (backoffDelay )
269270 retryNum ++
270271 }
271272
272273 // send all buffers
273- client .Logger .Info ("Shutting down - publishing all buffers" )
274+ client .Logger .Info (
275+ "Shutting down - publishing all buffers" ,
276+ zap .Duration ("retryShutdownTimeout" , retryShutdownTimeout ),
277+ zap .Duration ("elapsedTime" , time .Since (processingStart )),
278+ )
274279 client .publishAllBuffers ()
275280
276281 // reinitialize expBackoff with MaxElapsedTime based on actually elapsed time of processing (previous phase)
277282 processingElapsed := time .Since (processingStart )
278- remainingShutdownTimeout := maxDuration (shutdownTimeout - processingElapsed , shutdownTimeout / 2 )
283+ maxElapsedTime = maxDuration (retryShutdownTimeout - processingElapsed , retryShutdownTimeout / 2 )
284+ client .Logger .Info (
285+ "Shutting down - waiting for buffers" ,
286+ zap .Duration ("maxElapsedTime" , maxElapsedTime ),
287+ zap .Duration ("retryShutdownTimeout" , retryShutdownTimeout ),
288+ zap .Duration ("elapsedTime" , time .Since (processingStart )),
289+ )
290+
279291 expBackoff = backoff.ExponentialBackOff {
280292 InitialInterval : client .Config .BufferSettings .RetryInitialInterval ,
281293 RandomizationFactor : client .Config .BufferSettings .RetryRandomizationFactor ,
282294 Multiplier : client .Config .BufferSettings .RetryMultiplier ,
283295 MaxInterval : client .Config .BufferSettings .RetryMaxInterval ,
284- MaxElapsedTime : remainingShutdownTimeout ,
296+ MaxElapsedTime : maxElapsedTime ,
285297 Stop : backoff .Stop ,
286298 Clock : backoff .SystemClock ,
287299 }
@@ -301,25 +313,43 @@ func (client *DataSetClient) Shutdown() error {
301313 zap .Uint64 ("buffersEnqueued" , client .buffersEnqueued .Load ()),
302314 zap .Uint64 ("buffersProcessed" , client .buffersProcessed .Load ()),
303315 zap .Uint64 ("buffersDropped" , client .buffersDropped .Load ()),
316+ zap .Duration ("elapsedTime" , time .Since (processingStart )),
317+ zap .Duration ("maxElapsedTime" , maxElapsedTime ),
304318 )
305319 if backoffDelay == expBackoff .Stop {
306- lastError = fmt .Errorf (
307- "not all buffers have been processed - %d" ,
308- client .buffersEnqueued .Load ()- client .buffersProcessed .Load ()- client .buffersDropped .Load (),
309- )
310- client .Logger .Error (
311- "Shutting down - not all buffers have been processed" ,
312- zap .Int ("retryNum" , retryNum ),
313- zap .Uint64 ("buffersEnqueued" , client .buffersEnqueued .Load ()),
314- zap .Uint64 ("buffersProcessed" , client .buffersProcessed .Load ()),
315- zap .Uint64 ("buffersDropped" , client .buffersDropped .Load ()),
316- )
317320 break
318321 }
319322 time .Sleep (backoffDelay )
320323 retryNum ++
321324 }
322325
326+ // construct error messages
327+ if client .isProcessingEvents () {
328+ lastError = fmt .Errorf (
329+ "not all events have been processed - %d" ,
330+ client .eventsEnqueued .Load ()- client .eventsProcessed .Load (),
331+ )
332+ client .Logger .Error (
333+ "Shutting down - not all events have been processed" ,
334+ zap .Uint64 ("eventsEnqueued" , client .eventsEnqueued .Load ()),
335+ zap .Uint64 ("eventsProcessed" , client .eventsProcessed .Load ()),
336+ )
337+ }
338+
339+ if client .isProcessingBuffers () {
340+ lastError = fmt .Errorf (
341+ "not all buffers have been processed - %d" ,
342+ client .buffersEnqueued .Load ()- client .buffersProcessed .Load ()- client .buffersDropped .Load (),
343+ )
344+ client .Logger .Error (
345+ "Shutting down - not all buffers have been processed" ,
346+ zap .Int ("retryNum" , retryNum ),
347+ zap .Uint64 ("buffersEnqueued" , client .buffersEnqueued .Load ()),
348+ zap .Uint64 ("buffersProcessed" , client .buffersProcessed .Load ()),
349+ zap .Uint64 ("buffersDropped" , client .buffersDropped .Load ()),
350+ )
351+ }
352+
323353 buffersDropped := client .buffersDropped .Load () - initialDropped
324354 if buffersDropped > 0 {
325355 lastError = fmt .Errorf (
@@ -336,9 +366,17 @@ func (client *DataSetClient) Shutdown() error {
336366 client .logStatistics ()
337367
338368 if lastError == nil {
339- client .Logger .Info ("Shutting down - success" )
369+ client .Logger .Info (
370+ "Shutting down - success" ,
371+ zap .Duration ("retryShutdownTimeout" , retryShutdownTimeout ),
372+ zap .Duration ("elapsedTime" , time .Since (processingStart )),
373+ )
340374 } else {
341- client .Logger .Error ("Shutting down - error" , zap .Error (lastError ))
375+ client .Logger .Error (
376+ "Shutting down - error" , zap .Error (lastError ),
377+ zap .Duration ("retryShutdownTimeout" , retryShutdownTimeout ),
378+ zap .Duration ("elapsedTime" , time .Since (processingStart )),
379+ )
342380 if client .LastError () == nil {
343381 return lastError
344382 }
@@ -475,13 +513,6 @@ func truncateText(text string, length int) string {
475513 return text
476514}
477515
478- func minDuration (a , b time.Duration ) time.Duration {
479- if a <= b {
480- return a
481- }
482- return b
483- }
484-
485516func maxDuration (a , b time.Duration ) time.Duration {
486517 if a >= b {
487518 return a
0 commit comments