@@ -13,9 +13,14 @@ import io.embrace.android.embracesdk.fixtures.fakeSessionStoredTelemetryMetadata
1313import io.embrace.android.embracesdk.internal.comms.api.Endpoint
1414import io.embrace.android.embracesdk.internal.comms.delivery.NetworkStatus
1515import io.embrace.android.embracesdk.internal.comms.delivery.toConnectivityStatus
16+ import io.embrace.android.embracesdk.internal.delivery.PayloadType
17+ import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
18+ import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType
1619import io.embrace.android.embracesdk.internal.delivery.execution.ExecutionResult
1720import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingServiceImpl.Companion.INITIAL_DELAY_MS
1821import io.embrace.android.embracesdk.internal.payload.Envelope
22+ import io.embrace.android.embracesdk.internal.payload.Log
23+ import io.embrace.android.embracesdk.internal.payload.LogPayload
1924import io.embrace.android.embracesdk.internal.payload.SessionPayload
2025import io.embrace.android.embracesdk.internal.payload.Span
2126import io.embrace.android.embracesdk.internal.worker.BackgroundWorker
@@ -68,11 +73,14 @@ internal class SchedulingServiceImplTest {
6873 @Test
6974 fun `new payload will trigger new delivery loop if the previous one is done` () {
7075 schedulingExecutor.blockingMode = true
76+ val countBefore = schedulingExecutor.submitCount
7177 schedulingService.onResurrectionComplete()
72- assertEquals(1 , schedulingExecutor.submitCount)
78+ assertEquals(countBefore + 1 , schedulingExecutor.submitCount)
7379 schedulingExecutor.awaitExecutionCompletion()
7480 schedulingService.onPayloadIntake()
75- assertEquals(3 , schedulingExecutor.submitCount)
81+ // The gate-release for the session's successful delivery also submits a loop,
82+ // so the total is: resurrection loop + finally-block submit + intake loop
83+ assertTrue(schedulingExecutor.submitCount > countBefore + 1 )
7684 }
7785
7886 @Test
@@ -150,6 +158,9 @@ internal class SchedulingServiceImplTest {
150158 waitForPayloadIntake()
151159 deliveryExecutor.blockingMode = false
152160 deliveryExecutor.awaitExecutionCompletion()
161+ // With ordered delivery, only 1 session + 1 log are sent in the first pass.
162+ // The gate-release triggers a new loop that picks up the second session.
163+ waitForSchedulingAndDeliveryToRun()
153164 assertEquals(3 , executionService.sendAttempts())
154165 }
155166
@@ -246,8 +257,7 @@ internal class SchedulingServiceImplTest {
246257 assertEquals(0 , executionService.sendAttempts())
247258 assertEquals(2 , storageService.storedPayloadCount())
248259 networkConnectivityService.connectivityStatus = NetworkStatus .WIFI .toConnectivityStatus()
249- schedulingExecutor.awaitExecutionCompletion()
250- deliveryExecutor.awaitExecutionCompletion()
260+ waitForSchedulingAndDeliveryToRun()
251261 assertEquals(2 , executionService.sendAttempts())
252262 assertEquals(0 , storageService.storedPayloadCount())
253263 }
@@ -305,15 +315,19 @@ internal class SchedulingServiceImplTest {
305315 }
306316
307317 @Test
308- fun `connection timeout doesn't further delivery attempts` () {
318+ fun `connection timeout schedules retry without blocking permanently` () {
319+ schedulingExecutor.blockingMode = true
309320 allSendsTimeout()
310321 waitForResurrectionAndDeliveryAttempt()
311322 assertEquals(2 , executionService.sendAttempts())
312323 assertEquals(2 , storageService.storedPayloadCount())
313- storageService.addFakePayload(fakeSessionStoredTelemetryMetadata2)
314- waitForPayloadIntakeAndDeliveryAttempt()
315- assertEquals(3 , executionService.sendAttempts())
316- assertEquals(3 , storageService.storedPayloadCount())
324+ // After retry delay, both payloads should be retried
325+ allSendsSucceed()
326+ tickAndWaitForDeliveryAttempt(INITIAL_DELAY_MS + 1 )
327+ // Gate-release from first retry triggers loop for second
328+ waitForSchedulingAndDeliveryToRun()
329+ assertEquals(4 , executionService.sendAttempts())
330+ assertEquals(0 , storageService.storedPayloadCount())
317331 }
318332
319333 @Test
@@ -401,8 +415,7 @@ internal class SchedulingServiceImplTest {
401415 @Test
402416 fun `non-crash payloads held until resurrection completes` () {
403417 schedulingService.onPayloadIntake()
404- schedulingExecutor.awaitExecutionCompletion()
405- deliveryExecutor.awaitExecutionCompletion()
418+ waitForSchedulingAndDeliveryToRun()
406419 assertEquals(0 , executionService.sendAttempts())
407420 assertEquals(2 , storageService.storedPayloadCount())
408421 waitForResurrectionAndDeliveryAttempt()
@@ -415,8 +428,7 @@ internal class SchedulingServiceImplTest {
415428 storageService.clearStorage()
416429 storageService.addFakePayload(fakeCrashStoredTelemetryMetadata)
417430 schedulingService.onPayloadIntake()
418- schedulingExecutor.awaitExecutionCompletion()
419- deliveryExecutor.awaitExecutionCompletion()
431+ waitForSchedulingAndDeliveryToRun()
420432 assertEquals(1 , executionService.sendAttempts())
421433 assertEquals(0 , storageService.storedPayloadCount())
422434 }
@@ -428,8 +440,7 @@ internal class SchedulingServiceImplTest {
428440 val newerPayload = Envelope (data = SessionPayload (spans = listOf (Span (name = " newer" ))))
429441 storageService.addPayload(fakeSessionStoredTelemetryMetadata2, newerPayload)
430442 schedulingService.onPayloadIntake()
431- schedulingExecutor.awaitExecutionCompletion()
432- deliveryExecutor.awaitExecutionCompletion()
443+ waitForSchedulingAndDeliveryToRun()
433444 assertEquals(0 , executionService.sendAttempts())
434445
435446 // Simulate resurrection completing and adding an older session
@@ -453,6 +464,123 @@ internal class SchedulingServiceImplTest {
453464 assertEquals(countAfterFirst, schedulingExecutor.submitCount)
454465 }
455466
467+ @Test
468+ fun `session payloads are delivered in timestamp order` () {
469+ storageService.clearStorage()
470+ val s1 = Envelope (data = SessionPayload (spans = listOf (Span (name = " s1" ))))
471+ val s2 = Envelope (data = SessionPayload (spans = listOf (Span (name = " s2" ))))
472+ val s3 = Envelope (data = SessionPayload (spans = listOf (Span (name = " s3" ))))
473+ // Add payloads to the queue out of order
474+ storageService.addPayload(fakeSessionStoredTelemetryMetadata2, s2)
475+ storageService.addPayload(fakeSessionStoredTelemetryMetadata, s1)
476+ val thirdSession = StoredTelemetryMetadata (
477+ timestamp = clock.now() + 20_000L ,
478+ uuid = " c3c3c3c3-c3c3-c3c3-c3c3-c3c3c3c3c3c3" ,
479+ processIdentifier = " 8115ec91-3e5e-4d8a-816d-cc40306f9822" ,
480+ envelopeType = SupportedEnvelopeType .SESSION ,
481+ payloadType = PayloadType .SESSION ,
482+ )
483+ storageService.addPayload(thirdSession, s3)
484+ waitForResurrectionAndDeliveryAttempt()
485+ // Gate-release triggers loop for each subsequent session
486+ waitForSchedulingAndDeliveryToRun()
487+ waitForSchedulingAndDeliveryToRun()
488+ val requests = executionService.getRequests<SessionPayload >()
489+ assertEquals(3 , requests.size)
490+ assertEquals(" s1" , requests[0 ].data.spans?.single()?.name)
491+ assertEquals(" s2" , requests[1 ].data.spans?.single()?.name)
492+ assertEquals(" s3" , requests[2 ].data.spans?.single()?.name)
493+ }
494+
495+ @Test
496+ fun `retryable failure keeps delivery blocked for payloads of that type with the first payload being retried` () {
497+ schedulingExecutor.blockingMode = true
498+ storageService.clearStorage()
499+ val s1 = Envelope (data = SessionPayload (spans = listOf (Span (name = " first" ))))
500+ val s2 = Envelope (data = SessionPayload (spans = listOf (Span (name = " second" ))))
501+ storageService.addPayload(fakeSessionStoredTelemetryMetadata, s1)
502+ storageService.addPayload(fakeSessionStoredTelemetryMetadata2, s2)
503+ allSendsFail()
504+ waitForResurrectionAndDeliveryAttempt()
505+ assertEquals(1 , executionService.sendAttempts())
506+ assertEquals(" first" , executionService.getRequests<SessionPayload >().single().data.spans?.single()?.name)
507+ tickAndWaitForDeliveryAttempt(INITIAL_DELAY_MS )
508+ assertEquals(2 , executionService.sendAttempts())
509+ tickAndWaitForDeliveryAttempt((INITIAL_DELAY_MS * 2 ) - 1 )
510+ assertEquals(2 , executionService.sendAttempts())
511+ tickAndWaitForDeliveryAttempt(1 )
512+ assertEquals(3 , executionService.sendAttempts())
513+ val currentPayloads = executionService.getRequests<SessionPayload >()
514+ assertEquals(currentPayloads[0 ], currentPayloads[1 ])
515+ assertEquals(currentPayloads[0 ], currentPayloads[2 ])
516+ allSendsSucceed()
517+ tickAndWaitForDeliveryAttempt(INITIAL_DELAY_MS * 4 )
518+ waitForSchedulingAndDeliveryToRun()
519+ val requests = executionService.getRequests<SessionPayload >()
520+ assertEquals(5 , requests.size)
521+ assertEquals(" first" , requests[3 ].data.spans?.single()?.name)
522+ assertEquals(" second" , requests[4 ].data.spans?.single()?.name)
523+ }
524+
525+ @Test
526+ fun `request failures that will not retry will not block subsequent payloads` () {
527+ storageService.clearStorage()
528+ val s1 = Envelope (data = SessionPayload (spans = listOf (Span (name = " doomed" ))))
529+ val s2 = Envelope (data = SessionPayload (spans = listOf (Span (name = " survivor" ))))
530+ storageService.addPayload(fakeSessionStoredTelemetryMetadata, s1)
531+ storageService.addPayload(fakeSessionStoredTelemetryMetadata2, s2)
532+
533+ // This makes the session not be retried
534+ executionService.constantResponse = ExecutionResult .PayloadTooLarge
535+ waitForResurrectionAndDeliveryAttempt()
536+ allSendsSucceed()
537+ waitForSchedulingAndDeliveryToRun()
538+ val requests = executionService.getRequests<SessionPayload >()
539+ assertEquals(2 , requests.size)
540+ assertEquals(" doomed" , requests[0 ].data.spans?.single()?.name)
541+ assertEquals(" survivor" , requests[1 ].data.spans?.single()?.name)
542+ }
543+
544+ @Test
545+ fun `non-retryable 4xx will not block other payloads` () {
546+ storageService.clearStorage()
547+ val s1 = Envelope (data = SessionPayload (spans = listOf (Span (name = " doomed" ))))
548+ val s2 = Envelope (data = SessionPayload (spans = listOf (Span (name = " survivor" ))))
549+ storageService.addPayload(fakeSessionStoredTelemetryMetadata, s1)
550+ executionService.constantResponse = ExecutionResult .Failure (400 )
551+ waitForResurrectionAndDeliveryAttempt()
552+ assertEquals(1 , executionService.sendAttempts())
553+ assertEquals(0 , storageService.storedPayloadCount())
554+ allSendsSucceed()
555+ storageService.addPayload(fakeSessionStoredTelemetryMetadata2, s2)
556+ waitForPayloadIntakeAndDeliveryAttempt()
557+ val requests = executionService.getRequests<SessionPayload >()
558+ assertEquals(2 , requests.size)
559+ assertEquals(" doomed" , requests[0 ].data.spans?.single()?.name)
560+ assertEquals(" survivor" , requests[1 ].data.spans?.single()?.name)
561+ assertEquals(0 , storageService.storedPayloadCount())
562+ }
563+
564+ @Test
565+ fun `different payload types will not block each other` () {
566+ schedulingExecutor.blockingMode = true
567+ deliveryExecutor.blockingMode = true
568+ storageService.clearStorage()
569+ storageService.addFakePayload(fakeCrashStoredTelemetryMetadata)
570+ storageService.addFakePayload(fakeSessionStoredTelemetryMetadata)
571+ allSendsFail()
572+ waitForResurrectionAndDeliveryAttempt()
573+ // Both payloads will be sent because they are of different types
574+ assertEquals(2 , executionService.sendAttempts())
575+ allSendsSucceed()
576+ val fakeCrashPayload = Envelope (data = LogPayload (logs = listOf (Log (body = " crash" ))))
577+ storageService.addPayload(fakeLogStoredTelemetryMetadata, fakeCrashPayload)
578+ waitForPayloadIntakeAndDeliveryAttempt()
579+ // While the to-be retried payloads are blocked, a new payload of a different type can be sent
580+ assertEquals(3 , executionService.sendAttempts())
581+ assertEquals(fakeCrashPayload, executionService.getRequests<LogPayload >().last())
582+ }
583+
456584 @Test(expected = RejectedExecutionException ::class )
457585 fun `test shutdown` () {
458586 logger.throwOnInternalError = false
@@ -502,8 +630,7 @@ internal class SchedulingServiceImplTest {
502630 */
503631 private fun tickAndWaitForDeliveryAttempt (delayMs : Long ) {
504632 schedulingExecutor.moveForwardAndRunBlocked(delayMs)
505- schedulingExecutor.awaitExecutionCompletion()
506- deliveryExecutor.awaitExecutionCompletion()
633+ waitForSchedulingAndDeliveryToRun()
507634 }
508635
509636 /* *
@@ -512,8 +639,11 @@ internal class SchedulingServiceImplTest {
512639 private fun triggerSwitchToWifi () {
513640 networkConnectivityService.connectivityStatus = NetworkStatus .WIFI .toConnectivityStatus()
514641 schedulingExecutor.runCurrentlyBlocked()
515- schedulingExecutor.awaitExecutionCompletion()
516- deliveryExecutor.awaitExecutionCompletion()
642+ waitForSchedulingAndDeliveryToRun()
643+ waitForSchedulingAndDeliveryToRun()
644+ }
645+
646+ private fun waitForSchedulingAndDeliveryToRun () {
517647 schedulingExecutor.awaitExecutionCompletion()
518648 deliveryExecutor.awaitExecutionCompletion()
519649 }
0 commit comments