@@ -307,8 +307,9 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch
307307 client .buffersProcessed .Add (1 )
308308 continue
309309 }
310- client .sendBufferWithRetryPolicy (buf )
311- client .buffersProcessed .Add (1 )
310+ if client .sendBufferWithRetryPolicy (buf ) {
311+ client .buffersProcessed .Add (1 )
312+ }
312313 } else {
313314 client .Logger .Error (
314315 "Cannot convert message to Buffer" ,
@@ -322,7 +323,7 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch
322323}
323324
324325// Sends buffer to DataSet. If not succeeds and try is possible (it retryable), try retry until possible (timeout)
325- func (client * DataSetClient ) sendBufferWithRetryPolicy (buf * buffer.Buffer ) {
326+ func (client * DataSetClient ) sendBufferWithRetryPolicy (buf * buffer.Buffer ) bool {
326327 // Do not use NewExponentialBackOff since it calls Reset and the code here must
327328 // call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
328329 expBackoff := backoff.ExponentialBackOff {
@@ -347,7 +348,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
347348 lastHttpStatus = HttpErrorHasErrorMessage
348349 client .LastHttpStatus .Store (lastHttpStatus )
349350 client .onBufferDrop (buf , lastHttpStatus , err )
350- break // exit loop (failed to send buffer)
351+ return false // exit loop (failed to send buffer)
351352 }
352353 lastHttpStatus = HttpErrorCannotConnect
353354 client .LastHttpStatus .Store (lastHttpStatus )
@@ -375,7 +376,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
375376 if isOkStatus (lastHttpStatus ) {
376377 // everything was fine, there is no need for retries
377378 client .bytesAPIAccepted .Add (uint64 (payloadLen ))
378- break // exit loop (buffer sent)
379+ return true // exit loop (buffer sent)
379380 }
380381
381382 backoffDelay := expBackoff .NextBackOff ()
@@ -384,7 +385,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
384385 // throw away the batch
385386 err = fmt .Errorf ("max elapsed time expired %w" , err )
386387 client .onBufferDrop (buf , lastHttpStatus , err )
387- break // exit loop (failed to send buffer)
388+ return false // exit loop (failed to send buffer)
388389 }
389390
390391 if isRetryableStatus (lastHttpStatus ) {
@@ -406,7 +407,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
406407 } else {
407408 err = fmt .Errorf ("non recoverable error %w" , err )
408409 client .onBufferDrop (buf , lastHttpStatus , err )
409- break // exit loop (failed to send buffer)
410+ return false // exit loop (failed to send buffer)
410411 }
411412 retryNum ++
412413 }
@@ -420,9 +421,8 @@ func (client *DataSetClient) statisticsSweeper() {
420421 }
421422}
422423
423- func (client * DataSetClient ) logStatistics () {
424- mb := float64 (1024 * 1024 )
425-
424+ // Statistics returns statistics about events, buffers processing from the start time
425+ func (client * DataSetClient ) Statistics () * Statistics {
426426 // for how long are events being processed
427427 firstAt := time .Unix (0 , client .firstReceivedAt .Load ())
428428 lastAt := time .Unix (0 , client .lastAcceptedAt .Load ())
@@ -431,54 +431,100 @@ func (client *DataSetClient) logStatistics() {
431431
432432 // if nothing was processed, do not log statistics
433433 if processingInSec <= 0 {
434- return
434+ return nil
435435 }
436436
437437 // log buffer stats
438438 bProcessed := client .buffersProcessed .Load ()
439439 bEnqueued := client .buffersEnqueued .Load ()
440440 bDropped := client .buffersDropped .Load ()
441441 bBroken := client .buffersBroken .Load ()
442- client .Logger .Info (
443- "Buffers' Queue Stats:" ,
444- zap .Uint64 ("processed" , bProcessed ),
445- zap .Uint64 ("enqueued" , bEnqueued ),
446- zap .Uint64 ("dropped" , bDropped ),
447- zap .Uint64 ("broken" , bBroken ),
448- zap .Uint64 ("waiting" , bEnqueued - bProcessed - bDropped - bBroken ),
449- zap .Float64 ("processingS" , processingInSec ),
450- )
442+
443+ buffersStats := QueueStats {
444+ bEnqueued ,
445+ bProcessed ,
446+ bDropped ,
447+ bBroken ,
448+ processingDur ,
449+ }
451450
452451 // log events stats
453452 eProcessed := client .eventsProcessed .Load ()
454453 eEnqueued := client .eventsEnqueued .Load ()
455454 eDropped := client .eventsDropped .Load ()
456455 eBroken := client .eventsBroken .Load ()
456+
457+ eventsStats := QueueStats {
458+ eEnqueued ,
459+ eProcessed ,
460+ eDropped ,
461+ eBroken ,
462+ processingDur ,
463+ }
464+
465+ // log transferred stats
466+ bAPISent := client .bytesAPISent .Load ()
467+ bAPIAccepted := client .bytesAPIAccepted .Load ()
468+ transferStats := TransferStats {
469+ bAPISent ,
470+ bAPIAccepted ,
471+ bProcessed ,
472+ processingDur ,
473+ }
474+
475+ return & Statistics {
476+ Buffers : buffersStats ,
477+ Events : eventsStats ,
478+ Transfer : transferStats ,
479+ }
480+ }
481+
482+ func (client * DataSetClient ) logStatistics () {
483+ stats := client .Statistics ()
484+ if stats == nil {
485+ return
486+ }
487+
488+ b := stats .Buffers
489+ client .Logger .Info (
490+ "Buffers' Queue Stats:" ,
491+ zap .Uint64 ("processed" , b .Processed ()),
492+ zap .Uint64 ("enqueued" , b .Enqueued ()),
493+ zap .Uint64 ("dropped" , b .Dropped ()),
494+ zap .Uint64 ("broken" , b .Broken ()),
495+ zap .Uint64 ("waiting" , b .Waiting ()),
496+ zap .Float64 ("successRate" , b .SuccessRate ()),
497+ zap .Float64 ("processingS" , b .ProcessingTime ().Seconds ()),
498+ zap .Duration ("processing" , b .ProcessingTime ()),
499+ )
500+
501+ // log events stats
502+ e := stats .Events
457503 client .Logger .Info (
458504 "Events' Queue Stats:" ,
459- zap .Uint64 ("processed" , eProcessed ),
460- zap .Uint64 ("enqueued" , eEnqueued ),
461- zap .Uint64 ("dropped" , eDropped ),
462- zap .Uint64 ("broken" , eBroken ),
463- zap .Uint64 ("waiting" , eEnqueued - eProcessed - eDropped - eBroken ),
464- zap .Float64 ("processingS" , processingInSec ),
505+ zap .Uint64 ("processed" , e .Processed ()),
506+ zap .Uint64 ("enqueued" , e .Enqueued ()),
507+ zap .Uint64 ("dropped" , e .Dropped ()),
508+ zap .Uint64 ("broken" , e .Broken ()),
509+ zap .Uint64 ("waiting" , e .Waiting ()),
510+ zap .Float64 ("successRate" , e .SuccessRate ()),
511+ zap .Float64 ("processingS" , e .ProcessingTime ().Seconds ()),
512+ zap .Duration ("processing" , e .ProcessingTime ()),
465513 )
466514
467515 // log transferred stats
468- bAPISent := float64 (client .bytesAPISent .Load ())
469- bAPIAccepted := float64 (client .bytesAPIAccepted .Load ())
470- throughput := bAPIAccepted / mb / processingInSec
471- successRate := (bAPIAccepted + 1 ) / (bAPISent + 1 )
472- perBuffer := (bAPIAccepted ) / float64 (bProcessed )
516+ mb := float64 (1024 * 1024 )
517+ t := stats .Transfer
473518 client .Logger .Info (
474519 "Transfer Stats:" ,
475- zap .Float64 ("bytesSentMB" , bAPISent / mb ),
476- zap .Float64 ("bytesAcceptedMB" , bAPIAccepted / mb ),
477- zap .Float64 ("throughputMBpS" , throughput ),
478- zap .Float64 ("perBufferMB" , perBuffer / mb ),
479- zap .Float64 ("successRate" , successRate ),
480- zap .Float64 ("processingS" , processingInSec ),
481- zap .Duration ("processing" , processingDur ),
520+ zap .Float64 ("bytesSentMB" , float64 (t .BytesSent ())/ mb ),
521+ zap .Float64 ("bytesAcceptedMB" , float64 (t .BytesAccepted ())/ mb ),
522+ zap .Float64 ("throughputMBpS" , t .ThroughputBpS ()/ mb ),
523+ zap .Uint64 ("buffersProcessed" , t .BuffersProcessed ()),
524+ zap .Float64 ("perBufferMB" , t .AvgBufferBytes ()/ mb ),
525+ zap .Float64 ("successRate" , t .SuccessRate ()),
526+ zap .Float64 ("processingS" , t .ProcessingTime ().Seconds ()),
527+ zap .Duration ("processing" , t .ProcessingTime ()),
482528 )
483529}
484530
0 commit comments