Skip to content

Commit 6893d25

Browse files
committed
Block on payload disk write at intake when doing resurrection
1 parent 334aafb commit 6893d25

File tree

4 files changed

+30
-4
lines changed

4 files changed

+30
-4
lines changed

embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/resurrection/PayloadResurrectionServiceImpl.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import io.embrace.android.embracesdk.internal.session.getSessionSpan
3434
import io.embrace.android.embracesdk.internal.utils.Provider
3535
import io.opentelemetry.kotlin.semconv.SessionAttributes
3636
import java.util.concurrent.CopyOnWriteArrayList
37+
import java.util.concurrent.TimeUnit
38+
import java.util.concurrent.TimeoutException
3739
import java.util.zip.GZIPInputStream
3840
import kotlin.math.max
3941

@@ -222,11 +224,17 @@ internal class PayloadResurrectionServiceImpl(
222224
else -> null
223225
}
224226

227+
// Synchronously provide the payload to the IntakeService, blocking on the returned Future
225228
if (resurrectedPayload != null) {
226-
intakeService.take(
229+
val task = intakeService.take(
227230
intake = resurrectedPayload,
228231
metadata = copy(complete = true)
229232
)
233+
try {
234+
task.get(5, TimeUnit.SECONDS)
235+
} catch (e: TimeoutException) {
236+
logger.trackInternalError(InternalErrorType.INTAKE_FAIL, e)
237+
}
230238
}
231239
}
232240

embrace-android-delivery-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeIntakeService.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package io.embrace.android.embracesdk.fakes
33
import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
44
import io.embrace.android.embracesdk.internal.delivery.intake.IntakeService
55
import io.embrace.android.embracesdk.internal.payload.Envelope
6+
import java.util.concurrent.Future
7+
import java.util.concurrent.TimeUnit
68

79
class FakeIntakeService : IntakeService {
810

@@ -23,11 +25,24 @@ class FakeIntakeService : IntakeService {
2325
shutdownCount++
2426
}
2527

26-
override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata) {
28+
override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata): Future<*> {
2729
val dst = when (metadata.complete) {
2830
true -> intakeList
2931
false -> cacheList
3032
}
3133
dst.add(FakePayloadIntake(intake, metadata))
34+
return fakeFuture
35+
}
36+
37+
val fakeFuture = object : Future<Boolean> {
38+
override fun cancel(p0: Boolean): Boolean = false
39+
40+
override fun isCancelled(): Boolean = false
41+
42+
override fun isDone(): Boolean = true
43+
44+
override fun get() = true
45+
46+
override fun get(p0: Long, p1: TimeUnit) = true
3247
}
3348
}

embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/intake/IntakeService.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.embrace.android.embracesdk.internal.delivery.intake
33
import io.embrace.android.embracesdk.internal.delivery.Shutdownable
44
import io.embrace.android.embracesdk.internal.delivery.StoredTelemetryMetadata
55
import io.embrace.android.embracesdk.internal.payload.Envelope
6+
import java.util.concurrent.Future
67

78
/**
89
* IntakeService is responsible for storing telemetry payloads to disk and notifying the
@@ -19,5 +20,5 @@ interface IntakeService : Shutdownable {
1920
/**
2021
* Stores a payload on disk as its JSON representation.
2122
*/
22-
fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata)
23+
fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata): Future<*>
2324
}

embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/intake/IntakeServiceImpl.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class IntakeServiceImpl(
3232
worker.shutdownAndWait(shutdownTimeoutMs)
3333
}
3434

35-
override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata) {
35+
override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata): Future<*> {
3636
deliveryTracer?.onTake(metadata)
3737
val future = worker.submit(metadata) {
3838
processIntake(intake, metadata)
@@ -44,6 +44,8 @@ class IntakeServiceImpl(
4444
cachingTasks[metadata.envelopeType] = future
4545
prev?.cancel(false)
4646
}
47+
48+
return future
4749
}
4850

4951
@Suppress("UNCHECKED_CAST")

0 commit comments

Comments
 (0)