Skip to content

Commit ed46827

Browse files
committed
Delivery payload types in order that they were received
1 parent 22ee819 commit ed46827

File tree

2 files changed

+201
-56
lines changed

2 files changed

+201
-56
lines changed

embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImpl.kt

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class SchedulingServiceImpl(
4040
private val deleteInProgress: MutableSet<StoredTelemetryMetadata> = Collections.newSetFromMap(ConcurrentHashMap())
4141
private val payloadsToRetry: MutableMap<StoredTelemetryMetadata, RetryInstance> = ConcurrentHashMap()
4242
private val resurrectionComplete = AtomicBoolean(false)
43+
private val payloadsInProgress = ConcurrentHashMap<SupportedEnvelopeType, StoredTelemetryMetadata>()
4344

4445
override fun onPayloadIntake() {
4546
startDeliveryLoop()
@@ -111,6 +112,7 @@ class SchedulingServiceImpl(
111112
payload.run {
112113
if (eligibleForSending() && connectionStatus.ready()) {
113114
envelopeType.endpoint.updateBlockedEndpoint()
115+
payloadsInProgress[envelopeType] = this
114116
queueDelivery(this)
115117
}
116118
}
@@ -191,47 +193,55 @@ class SchedulingServiceImpl(
191193
}
192194

193195
deliveryTracer?.onPayloadResult(payload, result)
196+
result.processDeliveryResult(payload)
197+
activeSends.remove(payload)
198+
result
199+
}
200+
}
194201

195-
// If the request failed because the SDK cannot reach the Embrace server,
196-
// update the connection status to prevent delivery attempt
197-
if (result.failedToConnect()) {
198-
val nextConnectionAttemptTime = connectionStatus.block()
199-
scheduleDeliveryLoopStart(nextConnectionAttemptTime)
200-
} else if (result.connectedToServer()) {
201-
connectionStatus.connectionValidated()
202+
private fun ExecutionResult.processDeliveryResult(
203+
payload: StoredTelemetryMetadata,
204+
) {
205+
// If the request failed because the SDK cannot reach the Embrace server,
206+
// update the connection status to prevent delivery attempt
207+
if (failedToConnect()) {
208+
val nextConnectionAttemptTime = connectionStatus.block()
209+
scheduleDeliveryLoopStart(nextConnectionAttemptTime)
210+
} else if (connectedToServer()) {
211+
connectionStatus.connectionValidated()
212+
}
213+
214+
if (failedToConnect() || this is ExecutionResult.NetworkNotReady) {
215+
connectionStatus.payloadBlocked(payload)
216+
} else if (!shouldRetry) {
217+
// If the response is such that we should not ever retry the delivery of this payload,
218+
// delete it from both the in memory retry payloads map and on disk
219+
payloadsToRetry.remove(payload)
220+
deleteInProgress.add(payload)
221+
storageService.delete(payload) {
222+
deleteInProgress.remove(payload)
223+
}
224+
} else {
225+
// If delivery of this payload should be retried, add or replace the entry in the retry map
226+
// with the new values for how many times it has failed, and when the next retry should happen
227+
val retryAttempts = payloadsToRetry[payload]?.failedAttempts ?: 0
228+
val nextRetryTimeMs = if (this is ExecutionResult.TooManyRequests && retryAfterMs != null) {
229+
val unblockedTimestampMs = clock.now() + retryAfterMs
230+
blockedEndpoints[endpoint] = unblockedTimestampMs
231+
unblockedTimestampMs + 1L
232+
} else {
233+
clock.calculateNextRetryTime(retryAttempts = retryAttempts)
202234
}
203235

204-
with(result) {
205-
if (failedToConnect() || this is ExecutionResult.NetworkNotReady) {
206-
connectionStatus.payloadBlocked(payload)
207-
} else if (!shouldRetry) {
208-
// If the response is such that we should not ever retry the delivery of this payload,
209-
// delete it from both the in memory retry payloads map and on disk
210-
payloadsToRetry.remove(payload)
211-
deleteInProgress.add(payload)
212-
storageService.delete(payload) {
213-
deleteInProgress.remove(payload)
214-
}
215-
} else {
216-
// If delivery of this payload should be retried, add or replace the entry in the retry map
217-
// with the new values for how many times it has failed, and when the next retry should happen
218-
val retryAttempts = payloadsToRetry[payload]?.failedAttempts ?: 0
219-
val nextRetryTimeMs = if (this is ExecutionResult.TooManyRequests && retryAfterMs != null) {
220-
val unblockedTimestampMs = clock.now() + retryAfterMs
221-
blockedEndpoints[endpoint] = unblockedTimestampMs
222-
unblockedTimestampMs + 1L
223-
} else {
224-
clock.calculateNextRetryTime(retryAttempts = retryAttempts)
225-
}
236+
payloadsToRetry[payload] = RetryInstance(
237+
failedAttempts = retryAttempts + 1,
238+
nextRetryTimeMs = nextRetryTimeMs
239+
)
240+
}
226241

227-
payloadsToRetry[payload] = RetryInstance(
228-
failedAttempts = retryAttempts + 1,
229-
nextRetryTimeMs = nextRetryTimeMs
230-
)
231-
}
232-
}
233-
activeSends.remove(payload)
234-
result
242+
if (!shouldRetry) {
243+
payloadsInProgress.remove(payload.envelopeType)
244+
startDeliveryLoop()
235245
}
236246
}
237247

@@ -250,6 +260,11 @@ class SchedulingServiceImpl(
250260
return false
251261
}
252262

263+
val activePayload = payloadsInProgress[envelopeType]
264+
if (activePayload != null && activePayload != this) {
265+
return false
266+
}
267+
253268
return payloadsToRetry[this]?.run {
254269
clock.now() >= nextRetryTimeMs
255270
} ?: true

embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/scheduling/SchedulingServiceImplTest.kt

Lines changed: 149 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@ import io.embrace.android.embracesdk.fixtures.fakeSessionStoredTelemetryMetadata
1313
import io.embrace.android.embracesdk.internal.comms.api.Endpoint
1414
import io.embrace.android.embracesdk.internal.comms.delivery.NetworkStatus
1515
import io.embrace.android.embracesdk.internal.comms.delivery.toConnectivityStatus
16+
import io.embrace.android.embracesdk.internal.delivery.PayloadType
17+
import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
18+
import io.embrace.android.embracesdk.internal.delivery.SupportedEnvelopeType
1619
import io.embrace.android.embracesdk.internal.delivery.execution.ExecutionResult
1720
import io.embrace.android.embracesdk.internal.delivery.scheduling.SchedulingServiceImpl.Companion.INITIAL_DELAY_MS
1821
import io.embrace.android.embracesdk.internal.payload.Envelope
22+
import io.embrace.android.embracesdk.internal.payload.Log
23+
import io.embrace.android.embracesdk.internal.payload.LogPayload
1924
import io.embrace.android.embracesdk.internal.payload.SessionPayload
2025
import io.embrace.android.embracesdk.internal.payload.Span
2126
import io.embrace.android.embracesdk.internal.worker.BackgroundWorker
@@ -68,11 +73,14 @@ internal class SchedulingServiceImplTest {
6873
@Test
6974
fun `new payload will trigger new delivery loop if the previous one is done`() {
7075
schedulingExecutor.blockingMode = true
76+
val countBefore = schedulingExecutor.submitCount
7177
schedulingService.onResurrectionComplete()
72-
assertEquals(1, schedulingExecutor.submitCount)
78+
assertEquals(countBefore + 1, schedulingExecutor.submitCount)
7379
schedulingExecutor.awaitExecutionCompletion()
7480
schedulingService.onPayloadIntake()
75-
assertEquals(3, schedulingExecutor.submitCount)
81+
// The gate-release for the session's successful delivery also submits a loop,
82+
// so the total is: resurrection loop + finally-block submit + intake loop
83+
assertTrue(schedulingExecutor.submitCount > countBefore + 1)
7684
}
7785

7886
@Test
@@ -150,6 +158,9 @@ internal class SchedulingServiceImplTest {
150158
waitForPayloadIntake()
151159
deliveryExecutor.blockingMode = false
152160
deliveryExecutor.awaitExecutionCompletion()
161+
// With ordered delivery, only 1 session + 1 log are sent in the first pass.
162+
// The gate-release triggers a new loop that picks up the second session.
163+
waitForSchedulingAndDeliveryToRun()
153164
assertEquals(3, executionService.sendAttempts())
154165
}
155166

@@ -246,8 +257,7 @@ internal class SchedulingServiceImplTest {
246257
assertEquals(0, executionService.sendAttempts())
247258
assertEquals(2, storageService.storedPayloadCount())
248259
networkConnectivityService.connectivityStatus = NetworkStatus.WIFI.toConnectivityStatus()
249-
schedulingExecutor.awaitExecutionCompletion()
250-
deliveryExecutor.awaitExecutionCompletion()
260+
waitForSchedulingAndDeliveryToRun()
251261
assertEquals(2, executionService.sendAttempts())
252262
assertEquals(0, storageService.storedPayloadCount())
253263
}
@@ -305,15 +315,19 @@ internal class SchedulingServiceImplTest {
305315
}
306316

307317
@Test
308-
fun `connection timeout doesn't further delivery attempts`() {
318+
fun `connection timeout schedules retry without blocking permanently`() {
319+
schedulingExecutor.blockingMode = true
309320
allSendsTimeout()
310321
waitForResurrectionAndDeliveryAttempt()
311322
assertEquals(2, executionService.sendAttempts())
312323
assertEquals(2, storageService.storedPayloadCount())
313-
storageService.addFakePayload(fakeSessionStoredTelemetryMetadata2)
314-
waitForPayloadIntakeAndDeliveryAttempt()
315-
assertEquals(3, executionService.sendAttempts())
316-
assertEquals(3, storageService.storedPayloadCount())
324+
// After retry delay, both payloads should be retried
325+
allSendsSucceed()
326+
tickAndWaitForDeliveryAttempt(INITIAL_DELAY_MS + 1)
327+
// Gate-release from first retry triggers loop for second
328+
waitForSchedulingAndDeliveryToRun()
329+
assertEquals(4, executionService.sendAttempts())
330+
assertEquals(0, storageService.storedPayloadCount())
317331
}
318332

319333
@Test
@@ -401,8 +415,7 @@ internal class SchedulingServiceImplTest {
401415
@Test
402416
fun `non-crash payloads held until resurrection completes`() {
403417
schedulingService.onPayloadIntake()
404-
schedulingExecutor.awaitExecutionCompletion()
405-
deliveryExecutor.awaitExecutionCompletion()
418+
waitForSchedulingAndDeliveryToRun()
406419
assertEquals(0, executionService.sendAttempts())
407420
assertEquals(2, storageService.storedPayloadCount())
408421
waitForResurrectionAndDeliveryAttempt()
@@ -415,8 +428,7 @@ internal class SchedulingServiceImplTest {
415428
storageService.clearStorage()
416429
storageService.addFakePayload(fakeCrashStoredTelemetryMetadata)
417430
schedulingService.onPayloadIntake()
418-
schedulingExecutor.awaitExecutionCompletion()
419-
deliveryExecutor.awaitExecutionCompletion()
431+
waitForSchedulingAndDeliveryToRun()
420432
assertEquals(1, executionService.sendAttempts())
421433
assertEquals(0, storageService.storedPayloadCount())
422434
}
@@ -428,8 +440,7 @@ internal class SchedulingServiceImplTest {
428440
val newerPayload = Envelope(data = SessionPayload(spans = listOf(Span(name = "newer"))))
429441
storageService.addPayload(fakeSessionStoredTelemetryMetadata2, newerPayload)
430442
schedulingService.onPayloadIntake()
431-
schedulingExecutor.awaitExecutionCompletion()
432-
deliveryExecutor.awaitExecutionCompletion()
443+
waitForSchedulingAndDeliveryToRun()
433444
assertEquals(0, executionService.sendAttempts())
434445

435446
// Simulate resurrection completing and adding an older session
@@ -453,6 +464,123 @@ internal class SchedulingServiceImplTest {
453464
assertEquals(countAfterFirst, schedulingExecutor.submitCount)
454465
}
455466

467+
@Test
468+
fun `session payloads are delivered in timestamp order`() {
469+
storageService.clearStorage()
470+
val s1 = Envelope(data = SessionPayload(spans = listOf(Span(name = "s1"))))
471+
val s2 = Envelope(data = SessionPayload(spans = listOf(Span(name = "s2"))))
472+
val s3 = Envelope(data = SessionPayload(spans = listOf(Span(name = "s3"))))
473+
// Add payloads to the queue out of order
474+
storageService.addPayload(fakeSessionStoredTelemetryMetadata2, s2)
475+
storageService.addPayload(fakeSessionStoredTelemetryMetadata, s1)
476+
val thirdSession = StoredTelemetryMetadata(
477+
timestamp = clock.now() + 20_000L,
478+
uuid = "c3c3c3c3-c3c3-c3c3-c3c3-c3c3c3c3c3c3",
479+
processIdentifier = "8115ec91-3e5e-4d8a-816d-cc40306f9822",
480+
envelopeType = SupportedEnvelopeType.SESSION,
481+
payloadType = PayloadType.SESSION,
482+
)
483+
storageService.addPayload(thirdSession, s3)
484+
waitForResurrectionAndDeliveryAttempt()
485+
// Gate-release triggers loop for each subsequent session
486+
waitForSchedulingAndDeliveryToRun()
487+
waitForSchedulingAndDeliveryToRun()
488+
val requests = executionService.getRequests<SessionPayload>()
489+
assertEquals(3, requests.size)
490+
assertEquals("s1", requests[0].data.spans?.single()?.name)
491+
assertEquals("s2", requests[1].data.spans?.single()?.name)
492+
assertEquals("s3", requests[2].data.spans?.single()?.name)
493+
}
494+
495+
@Test
496+
fun `retryable failure keeps delivery blocked for payloads of that type with the first payload being retried`() {
497+
schedulingExecutor.blockingMode = true
498+
storageService.clearStorage()
499+
val s1 = Envelope(data = SessionPayload(spans = listOf(Span(name = "first"))))
500+
val s2 = Envelope(data = SessionPayload(spans = listOf(Span(name = "second"))))
501+
storageService.addPayload(fakeSessionStoredTelemetryMetadata, s1)
502+
storageService.addPayload(fakeSessionStoredTelemetryMetadata2, s2)
503+
allSendsFail()
504+
waitForResurrectionAndDeliveryAttempt()
505+
assertEquals(1, executionService.sendAttempts())
506+
assertEquals("first", executionService.getRequests<SessionPayload>().single().data.spans?.single()?.name)
507+
tickAndWaitForDeliveryAttempt(INITIAL_DELAY_MS)
508+
assertEquals(2, executionService.sendAttempts())
509+
tickAndWaitForDeliveryAttempt((INITIAL_DELAY_MS * 2) - 1)
510+
assertEquals(2, executionService.sendAttempts())
511+
tickAndWaitForDeliveryAttempt(1)
512+
assertEquals(3, executionService.sendAttempts())
513+
val currentPayloads = executionService.getRequests<SessionPayload>()
514+
assertEquals(currentPayloads[0], currentPayloads[1])
515+
assertEquals(currentPayloads[0], currentPayloads[2])
516+
allSendsSucceed()
517+
tickAndWaitForDeliveryAttempt(INITIAL_DELAY_MS * 4)
518+
waitForSchedulingAndDeliveryToRun()
519+
val requests = executionService.getRequests<SessionPayload>()
520+
assertEquals(5, requests.size)
521+
assertEquals("first", requests[3].data.spans?.single()?.name)
522+
assertEquals("second", requests[4].data.spans?.single()?.name)
523+
}
524+
525+
@Test
526+
fun `request failures that will not retry will not block subsequent payloads`() {
527+
storageService.clearStorage()
528+
val s1 = Envelope(data = SessionPayload(spans = listOf(Span(name = "doomed"))))
529+
val s2 = Envelope(data = SessionPayload(spans = listOf(Span(name = "survivor"))))
530+
storageService.addPayload(fakeSessionStoredTelemetryMetadata, s1)
531+
storageService.addPayload(fakeSessionStoredTelemetryMetadata2, s2)
532+
533+
// This makes the session not be retried
534+
executionService.constantResponse = ExecutionResult.PayloadTooLarge
535+
waitForResurrectionAndDeliveryAttempt()
536+
allSendsSucceed()
537+
waitForSchedulingAndDeliveryToRun()
538+
val requests = executionService.getRequests<SessionPayload>()
539+
assertEquals(2, requests.size)
540+
assertEquals("doomed", requests[0].data.spans?.single()?.name)
541+
assertEquals("survivor", requests[1].data.spans?.single()?.name)
542+
}
543+
544+
@Test
545+
fun `non-retryable 4xx will not block other payloads`() {
546+
storageService.clearStorage()
547+
val s1 = Envelope(data = SessionPayload(spans = listOf(Span(name = "doomed"))))
548+
val s2 = Envelope(data = SessionPayload(spans = listOf(Span(name = "survivor"))))
549+
storageService.addPayload(fakeSessionStoredTelemetryMetadata, s1)
550+
executionService.constantResponse = ExecutionResult.Failure(400)
551+
waitForResurrectionAndDeliveryAttempt()
552+
assertEquals(1, executionService.sendAttempts())
553+
assertEquals(0, storageService.storedPayloadCount())
554+
allSendsSucceed()
555+
storageService.addPayload(fakeSessionStoredTelemetryMetadata2, s2)
556+
waitForPayloadIntakeAndDeliveryAttempt()
557+
val requests = executionService.getRequests<SessionPayload>()
558+
assertEquals(2, requests.size)
559+
assertEquals("doomed", requests[0].data.spans?.single()?.name)
560+
assertEquals("survivor", requests[1].data.spans?.single()?.name)
561+
assertEquals(0, storageService.storedPayloadCount())
562+
}
563+
564+
@Test
565+
fun `different payload types will not block each other`() {
566+
schedulingExecutor.blockingMode = true
567+
deliveryExecutor.blockingMode = true
568+
storageService.clearStorage()
569+
storageService.addFakePayload(fakeCrashStoredTelemetryMetadata)
570+
storageService.addFakePayload(fakeSessionStoredTelemetryMetadata)
571+
allSendsFail()
572+
waitForResurrectionAndDeliveryAttempt()
573+
// Both payloads will be sent because they are of different types
574+
assertEquals(2, executionService.sendAttempts())
575+
allSendsSucceed()
576+
val fakeCrashPayload = Envelope(data = LogPayload(logs = listOf(Log(body = "crash"))))
577+
storageService.addPayload(fakeLogStoredTelemetryMetadata, fakeCrashPayload)
578+
waitForPayloadIntakeAndDeliveryAttempt()
579+
// While the to-be retried payloads are blocked, a new payload of a different type can be sent
580+
assertEquals(3, executionService.sendAttempts())
581+
assertEquals(fakeCrashPayload, executionService.getRequests<LogPayload>().last())
582+
}
583+
456584
@Test(expected = RejectedExecutionException::class)
457585
fun `test shutdown`() {
458586
logger.throwOnInternalError = false
@@ -502,8 +630,7 @@ internal class SchedulingServiceImplTest {
502630
*/
503631
private fun tickAndWaitForDeliveryAttempt(delayMs: Long) {
504632
schedulingExecutor.moveForwardAndRunBlocked(delayMs)
505-
schedulingExecutor.awaitExecutionCompletion()
506-
deliveryExecutor.awaitExecutionCompletion()
633+
waitForSchedulingAndDeliveryToRun()
507634
}
508635

509636
/**
@@ -512,8 +639,11 @@ internal class SchedulingServiceImplTest {
512639
private fun triggerSwitchToWifi() {
513640
networkConnectivityService.connectivityStatus = NetworkStatus.WIFI.toConnectivityStatus()
514641
schedulingExecutor.runCurrentlyBlocked()
515-
schedulingExecutor.awaitExecutionCompletion()
516-
deliveryExecutor.awaitExecutionCompletion()
642+
waitForSchedulingAndDeliveryToRun()
643+
waitForSchedulingAndDeliveryToRun()
644+
}
645+
646+
private fun waitForSchedulingAndDeliveryToRun() {
517647
schedulingExecutor.awaitExecutionCompletion()
518648
deliveryExecutor.awaitExecutionCompletion()
519649
}

0 commit comments

Comments
 (0)