Skip to content

Commit c923770

Browse files
committed
Streamline post-intake cleanup
1 parent 03518e0 commit c923770

File tree

7 files changed

+198
-70
lines changed

7 files changed

+198
-70
lines changed

embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/resurrection/PayloadResurrectionServiceImpl.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ internal class PayloadResurrectionServiceImpl(
225225
if (resurrectedPayload != null) {
226226
val task = intakeService.take(
227227
intake = resurrectedPayload,
228-
metadata = copy(complete = true)
228+
metadata = copy(complete = true),
229+
staleEntry = this,
229230
)
230231
try {
231232
task.get(5, TimeUnit.SECONDS)

embrace-android-core/src/main/kotlin/io/embrace/android/embracesdk/internal/session/orchestrator/PayloadStoreImpl.kt

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,44 @@ internal class PayloadStoreImpl(
2828
envelope: Envelope<SessionPayload>,
2929
transitionType: TransitionType,
3030
) {
31-
intakeService.take(envelope, createMetadata(SupportedEnvelopeType.SESSION, payloadType = PayloadType.SESSION))
31+
intakeService.take(
32+
intake = envelope,
33+
metadata = createMetadata(
34+
type = SupportedEnvelopeType.SESSION,
35+
payloadType = PayloadType.SESSION
36+
)
37+
)
3238
}
3339

3440
override fun cacheSessionSnapshot(envelope: Envelope<SessionPayload>) {
3541
intakeService.take(
36-
envelope,
37-
createMetadata(SupportedEnvelopeType.SESSION, complete = false, payloadType = PayloadType.SESSION)
42+
intake = envelope,
43+
metadata = createMetadata(
44+
type = SupportedEnvelopeType.SESSION,
45+
complete = false,
46+
payloadType = PayloadType.SESSION
47+
)
3848
)
3949
}
4050

4151
override fun storeLogPayload(envelope: Envelope<LogPayload>, attemptImmediateRequest: Boolean) {
4252
val type = findSupportedEnvelopeType(envelope.data.logs)
4353
val payloadType = getPayloadType(envelope)
4454
val payloadTypesHeader = getPayloadTypesHeader(envelope)
45-
intakeService.take(envelope, createMetadata(type, payloadType = payloadType, payloadTypesHeader = payloadTypesHeader))
55+
intakeService.take(
56+
intake = envelope,
57+
metadata = createMetadata(
58+
type = type,
59+
payloadType = payloadType,
60+
payloadTypesHeader = payloadTypesHeader
61+
)
62+
)
4663
}
4764

4865
override fun storeAttachment(envelope: Envelope<Pair<String, ByteArray>>) {
4966
intakeService.take(
50-
envelope,
51-
createMetadata(
67+
intake = envelope,
68+
metadata = createMetadata(
5269
type = SupportedEnvelopeType.ATTACHMENT,
5370
payloadType = PayloadType.ATTACHMENT
5471
)

embrace-android-core/src/test/kotlin/io/embrace/android/embracesdk/internal/resurrection/PayloadResurrectionServiceImplTest.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,11 @@ class PayloadResurrectionServiceImplTest {
430430
fun `resurrection timeout logged when future throws timeout on get`() {
431431
val hangingIntakeService = object : IntakeService {
432432
override fun shutdown() {}
433-
override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata): Future<*> {
433+
override fun take(
434+
intake: Envelope<*>,
435+
metadata: StoredTelemetryMetadata,
436+
staleEntry: StoredTelemetryMetadata?
437+
): Future<*> {
434438
return object : Future<Unit> {
435439
override fun cancel(mayInterruptIfRunning: Boolean) = false
436440
override fun isCancelled() = false

embrace-android-delivery-fakes/src/main/kotlin/io/embrace/android/embracesdk/fakes/FakeIntakeService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class FakeIntakeService : IntakeService {
2525
shutdownCount++
2626
}
2727

28-
override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata): Future<*> {
28+
override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata, staleEntry: StoredTelemetryMetadata?): Future<*> {
2929
val dst = when (metadata.complete) {
3030
true -> intakeList
3131
false -> cacheList

embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/intake/IntakeService.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,19 @@ import java.util.concurrent.Future
1212
* 1. Cache arbitrary payload types to disk (encoding priority & other metadata in the filename)
1313
* 2. Clean up any telemetry that exceeds our max disk usage policy
1414
* 3. Notify the scheduling service after a payload is stored to disk & is ready to send
15-
* 4. Handle process termination gracefully by waiting until all payloads in the queue have been
16-
* written to disk
15+
* 4. Handle process termination gracefully by waiting until all payloads in the queue have been written to disk
16+
* 5. If required, clean up any cached data that is no longer needed after the intake is successful
1717
*/
1818
interface IntakeService : Shutdownable {
1919

2020
/**
21-
* Stores a payload on disk as its JSON representation.
21+
* Stores the payload [intake] on disk as its JSON representation and associate it in the storage layer with [metadata].
22+
*
23+
* If [staleEntry] is non-null, the payload associated with it will be deleted once the new payload is successfully stored.
2224
*/
23-
fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata): Future<*>
25+
fun take(
26+
intake: Envelope<*>,
27+
metadata: StoredTelemetryMetadata,
28+
staleEntry: StoredTelemetryMetadata? = null,
29+
): Future<*>
2430
}

embrace-android-delivery/src/main/kotlin/io/embrace/android/embracesdk/internal/delivery/intake/IntakeServiceImpl.kt

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,24 @@ class IntakeServiceImpl(
2626
) : IntakeService {
2727

2828
private val cachingTasks: MutableMap<SupportedEnvelopeType, Future<*>> = ConcurrentHashMap()
29-
private val cacheReferences: MutableMap<SupportedEnvelopeType, StoredTelemetryMetadata> = ConcurrentHashMap()
29+
private val lastCachedEntry: MutableMap<SupportedEnvelopeType, StoredTelemetryMetadata> = ConcurrentHashMap()
3030

3131
override fun shutdown() {
3232
worker.shutdownAndWait(shutdownTimeoutMs)
3333
}
3434

35-
override fun take(intake: Envelope<*>, metadata: StoredTelemetryMetadata): Future<*> {
35+
override fun take(
36+
intake: Envelope<*>,
37+
metadata: StoredTelemetryMetadata,
38+
staleEntry: StoredTelemetryMetadata?,
39+
): Future<*> {
3640
deliveryTracer?.onTake(metadata)
3741
val future = worker.submit(metadata) {
38-
processIntake(intake, metadata)
42+
processIntake(
43+
intake = intake,
44+
metadata = metadata,
45+
staleEntry = staleEntry
46+
)
3947
}
4048

4149
// cancel any cache attempts that are already pending to avoid unnecessary I/O.
@@ -52,6 +60,7 @@ class IntakeServiceImpl(
5260
private fun processIntake(
5361
intake: Envelope<*>,
5462
metadata: StoredTelemetryMetadata,
63+
staleEntry: StoredTelemetryMetadata?,
5564
) {
5665
try {
5766
val service = when {
@@ -67,25 +76,34 @@ class IntakeServiceImpl(
6776
storeAttachment(stream, pair.second, pair.first)
6877
}
6978
}
70-
val lastReference = cacheReferences[metadata.envelopeType]
79+
80+
/**
81+
* Determine which cache entry to clean up:
82+
*
83+
* - Use [staleEntry] if caller explicitly tells the system that a particular payload will be stale after intake
84+
* - For complete payloads (i.e. ready for delivery), delete last cached entry for the given envelope type and don't replace it
85+
* - For incomplete payloads (i.e. cached snapshots not ready for delivery), delete last cached entry and replace it with intake
86+
*/
87+
val entryToCleanup = staleEntry
88+
?: if (metadata.complete) {
89+
// Take the last cached entry and don't replace it
90+
lastCachedEntry.remove(metadata.envelopeType)
91+
} else {
92+
// Take the last cached entry and replace it with the new intake
93+
lastCachedEntry.put(metadata.envelopeType, metadata)
94+
}
7195

7296
if (metadata.complete) {
7397
deliveryTracer?.onPayloadIntake(metadata)
7498
schedulingService.onPayloadIntake()
75-
} else {
76-
cacheReferences[metadata.envelopeType] = metadata
77-
if (!cacheableEnvelopeTypes.contains(metadata.envelopeType)) {
78-
logger.trackInternalError(
79-
InternalErrorType.INTAKE_UNEXPECTED_TYPE,
80-
IllegalStateException("Unexpected envelope type cache attempt: ${metadata.envelopeType}"),
81-
)
82-
}
99+
} else if (!cacheableEnvelopeTypes.contains(metadata.envelopeType)) {
100+
logger.trackInternalError(
101+
InternalErrorType.INTAKE_UNEXPECTED_TYPE,
102+
IllegalStateException("Unexpected envelope type cache attempt: ${metadata.envelopeType}"),
103+
)
83104
}
84105

85-
// Clean up any previously cached payload of the current type.
86-
// If the newly saved payload is complete, the cached copy is no longer needed. If it's a cache attempt,
87-
// the old copy is stale. Either way, it should be deleted.
88-
lastReference?.let {
106+
entryToCleanup?.let {
89107
cacheStorageService.delete(it)
90108
}
91109
} catch (exc: Throwable) {

embrace-android-delivery/src/test/kotlin/io/embrace/android/embracesdk/internal/delivery/intake/IntakeServiceImplTest.kt

Lines changed: 123 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -284,67 +284,149 @@ class IntakeServiceImplTest {
284284
}
285285

286286
@Test
287-
fun `only cached session payloads cleaned up and only by another session payload intake`() {
287+
fun `previous cached session snapshot is cleaned up automatically by IntakeServiceImpl`() {
288288
executorService.blockingMode = false
289-
val cache1 = StoredTelemetryMetadata(clock.now(), UUID, PROCESS_ID, SESSION, complete = false).apply {
290-
intakeService.take(
291-
intake = sessionEnvelope,
292-
metadata = this
293-
)
289+
val snapshot1 = StoredTelemetryMetadata(
290+
timestamp = clock.now(),
291+
uuid = UUID,
292+
processIdentifier = PROCESS_ID,
293+
envelopeType = SESSION,
294+
complete = false
295+
).apply {
296+
intakeService.take(intake = sessionEnvelope, metadata = this)
294297
}
295298

299+
// Log payload intake doesn't affect session cache
296300
intakeService.take(
297301
intake = logEnvelope,
298-
metadata = StoredTelemetryMetadata(clock.tick(), UUID, PROCESS_ID, LOG, complete = true)
299-
)
300-
301-
assertTrue(cacheStorageService.storedFilenames().contains(cache1.filename))
302-
303-
val cache2 = StoredTelemetryMetadata(clock.tick(), UUID, PROCESS_ID, SESSION, complete = false).apply {
304-
intakeService.take(
305-
intake = sessionEnvelope,
306-
metadata = this
302+
metadata = StoredTelemetryMetadata(
303+
timestamp = clock.tick(),
304+
uuid = UUID,
305+
processIdentifier = PROCESS_ID,
306+
envelopeType = LOG,
307+
complete = true
307308
)
309+
)
310+
assertTrue(cacheStorageService.storedFilenames().contains(snapshot1.filename))
311+
312+
// New snapshot replaces old one
313+
val snapshot2 = StoredTelemetryMetadata(
314+
timestamp = clock.tick(),
315+
uuid = UUID,
316+
processIdentifier = PROCESS_ID,
317+
envelopeType = SESSION,
318+
complete = false
319+
).apply {
320+
intakeService.take(intake = sessionEnvelope, metadata = this)
308321
}
322+
assertFalse(cacheStorageService.storedFilenames().contains(snapshot1.filename))
323+
assertTrue(cacheStorageService.storedFilenames().contains(snapshot2.filename))
324+
325+
// Complete session payload cleans up snapshot
326+
val session = StoredTelemetryMetadata(
327+
timestamp = clock.tick(),
328+
uuid = UUID,
329+
processIdentifier = PROCESS_ID,
330+
envelopeType = SESSION,
331+
complete = true
332+
).apply {
333+
intakeService.take(intake = sessionEnvelope, metadata = this)
334+
}
335+
assertFalse(cacheStorageService.storedFilenames().contains(snapshot2.filename))
336+
assertTrue(payloadStorageService.storedFilenames().contains(session.filename))
337+
assertTrue(logger.internalErrorMessages.isEmpty())
338+
}
309339

310-
assertFalse(cacheStorageService.storedFilenames().contains(cache1.filename))
311-
assertTrue(cacheStorageService.storedFilenames().contains(cache2.filename))
340+
@Test
341+
fun `new empty crash envelope caching will remove the old copy automatically`() {
342+
executorService.blockingMode = false
343+
val snapshot1 = StoredTelemetryMetadata(
344+
timestamp = clock.now(),
345+
uuid = UUID,
346+
processIdentifier = PROCESS_ID,
347+
envelopeType = CRASH,
348+
complete = false,
349+
payloadType = PayloadType.UNKNOWN
350+
).apply {
351+
intakeService.take(intake = logEnvelope, metadata = this)
352+
}
312353

313-
val session = StoredTelemetryMetadata(clock.tick(), UUID, PROCESS_ID, SESSION, complete = true).apply {
314-
intakeService.take(
315-
intake = sessionEnvelope,
316-
metadata = this
317-
)
354+
assertEquals(snapshot1.filename, cacheStorageService.storedFilenames().single())
355+
356+
val snapshot2 = StoredTelemetryMetadata(
357+
timestamp = clock.tick(),
358+
uuid = UUID,
359+
processIdentifier = PROCESS_ID,
360+
envelopeType = CRASH,
361+
complete = false,
362+
payloadType = PayloadType.UNKNOWN
363+
).apply {
364+
intakeService.take(intake = logEnvelope, metadata = this)
318365
}
319366

320-
assertFalse(cacheStorageService.storedFilenames().contains(cache2.filename))
321-
assertTrue(payloadStorageService.storedFilenames().contains(session.filename))
367+
assertEquals(snapshot2.filename, cacheStorageService.storedFilenames().single())
322368
assertTrue(logger.internalErrorMessages.isEmpty())
323369
}
324370

325371
@Test
326-
fun `new empty crash envelope caching will remove the old copy`() {
372+
fun `explicit staleEntry takes precedence over internal cache tracking`() {
327373
executorService.blockingMode = false
328-
val cache1 =
329-
StoredTelemetryMetadata(clock.now(), UUID, PROCESS_ID, CRASH, false, PayloadType.UNKNOWN).apply {
330-
intakeService.take(
331-
intake = logEnvelope,
332-
metadata = this
333-
)
334-
}
335374

336-
assertEquals(cache1.filename, cacheStorageService.storedFilenames().single())
375+
// Cache a session snapshot — tracked internally by IntakeServiceImpl
376+
val snapshot = StoredTelemetryMetadata(
377+
timestamp = clock.now(),
378+
uuid = UUID,
379+
processIdentifier = PROCESS_ID,
380+
envelopeType = SESSION,
381+
complete = false
382+
).apply {
383+
intakeService.take(intake = sessionEnvelope, metadata = this)
384+
}
385+
assertEquals(1, cacheStorageService.storedPayloadCount())
337386

338-
val cache2 =
339-
StoredTelemetryMetadata(clock.tick(), UUID, PROCESS_ID, CRASH, false, PayloadType.UNKNOWN).apply {
340-
intakeService.take(
341-
intake = logEnvelope,
342-
metadata = this
343-
)
387+
// Resurrection provides an explicit staleEntry for a different metadata.
388+
// The explicit staleEntry should be deleted, NOT the internally tracked snapshot.
389+
val resurrectionSource = StoredTelemetryMetadata(
390+
timestamp = clock.tick(),
391+
uuid = "other-uuid",
392+
processIdentifier = "old-pid",
393+
envelopeType = SESSION,
394+
complete = false
395+
).apply {
396+
cacheStorageService.store(this) {
397+
it.write("old".toByteArray())
344398
}
399+
}
400+
assertEquals(2, cacheStorageService.storedPayloadCount())
345401

346-
assertEquals(cache2.filename, cacheStorageService.storedFilenames().single())
347-
assertTrue(logger.internalErrorMessages.isEmpty())
402+
intakeService.take(
403+
intake = sessionEnvelope,
404+
metadata = StoredTelemetryMetadata(
405+
timestamp = clock.tick(),
406+
uuid = UUID,
407+
processIdentifier = PROCESS_ID,
408+
envelopeType = SESSION,
409+
complete = true
410+
),
411+
staleEntry = resurrectionSource
412+
)
413+
414+
// resurrectionSource was deleted (explicit staleEntry), snapshot is still present
415+
assertFalse(cacheStorageService.storedFilenames().contains(resurrectionSource.filename))
416+
assertTrue(cacheStorageService.storedFilenames().contains(snapshot.filename))
417+
assertEquals(1, payloadStorageService.storedPayloadCount())
418+
}
419+
420+
@Test
421+
fun `complete intake with no prior cached or not cacheable payloads deletes nothing`() {
422+
executorService.blockingMode = false
423+
424+
intakeService.take(intake = sessionEnvelope, metadata = sessionMetadata)
425+
intakeService.take(intake = logEnvelope, metadata = logMetadata)
426+
427+
assertEquals(2, payloadStorageService.storedPayloadCount())
428+
assertEquals(0, cacheStorageService.storedPayloadCount())
429+
assertEquals(0, cacheStorageService.deleteCount.get())
348430
}
349431

350432
@Test

0 commit comments

Comments
 (0)