Skip to content

Commit 03f2fb9

Browse files
committed
fix(android): preserve sync cursors and retention
1 parent 7e8fc94 commit 03f2fb9

9 files changed

Lines changed: 366 additions & 23 deletions

File tree

android/app/src/main/java/com/screwy/igloo/data/dao/Daos.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ interface VideoDao {
158158
suspend fun getPreviousVideoId(videoId: String): String?
159159

160160
/**
161-
* Prune shorts rows (TikTok / Instagram) past retention, respecting side-table
161+
* Prune shorts rows (TikTok / Instagram) past retention, respecting saved-item
162162
* protection. YouTube rows are excluded so the Videos tab can render faded
163163
* placeholders.
164164
*/
@@ -168,7 +168,6 @@ interface VideoDao {
168168
WHERE channel_id NOT LIKE 'youtube_%'
169169
AND published_at < :cutoffMs
170170
AND NOT EXISTS (SELECT 1 FROM bookmarks WHERE video_id = videos.video_id)
171-
AND NOT EXISTS (SELECT 1 FROM moment_views WHERE video_id = videos.video_id)
172171
"""
173172
)
174173
suspend fun pruneShorts(cutoffMs: Long): Int

android/app/src/main/java/com/screwy/igloo/logs/EventDictionary.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ object EventDictionary {
4747
),
4848
"stream_fetch_retry" to EventTemplate("Stream fetch retry scheduled"),
4949
"stream_marker_stalled" to EventTemplate("Stream marker stalled — bailed out"),
50+
"stream_parse_failed" to EventTemplate("Stream entry failed to parse"),
5051
"stream_all_parses_failed" to EventTemplate("Every stream entry failed to parse"),
5152
"bundle_parse_failure" to EventTemplate("Bundle payload parse failed"),
5253
"bundle_unknown_kind" to EventTemplate("Bundle had unknown kind — ignored"),

android/app/src/main/java/com/screwy/igloo/sync/AndroidSyncMirror.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,11 @@ class AndroidSyncMirror(
353353
),
354354
)
355355
}
356+
if (ingestParseFailures > 0) {
357+
throw IllegalStateException(
358+
"Android sync item import failed for generation $generationId; preserving item marker",
359+
)
360+
}
356361
dao.markItemsImportPageComplete(
357362
generationId = generationId,
358363
importedSeq = page.items.maxOf { it.seq },

android/app/src/main/java/com/screwy/igloo/sync/InboundReconciler.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,14 @@ class InboundReconciler(
139139
)
140140
}
141141
}
142-
if (parseFailures > 0 && parseFailures == response.bundles.size && response.bundles.isNotEmpty()) {
143-
// Every bundle in the page failed — don't advance cursor (server will resend).
142+
if (parseFailures > 0) {
144143
logger.error(
145-
event = "stream_all_parses_failed",
146-
fields = mapOf("stream" to stream.cursorKey, "count" to response.bundles.size.toString()),
144+
event = "stream_parse_failed",
145+
fields = mapOf(
146+
"stream" to stream.cursorKey,
147+
"failed" to parseFailures.toString(),
148+
"count" to response.bundles.size.toString(),
149+
),
147150
)
148151
return
149152
}

android/app/src/test/java/com/screwy/igloo/data/PruneProtectionTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.robolectric.annotation.Config
2121
*
2222
* 1. YouTube `videos` rows never prune (retention clause excludes them by channel_id).
2323
* 2. Shorts `videos` rows prune when past retention AND no side-table saves them.
24-
* 3. Shorts `videos` rows survive if protected by bookmarks OR moment_views.
24+
* 3. Shorts `videos` rows survive if protected by bookmarks.
2525
* 4. Twitter `feed_items` rows respect feed_likes + bookmarks protection.
2626
*/
2727
@RunWith(RobolectricTestRunner::class)
@@ -62,12 +62,12 @@ class PruneProtectionTest {
6262
assertNotNull(db.videoDao().getById("v_tt"))
6363
}
6464

65-
@Test fun videos_shortsSurvive_whenMomentViewed() = runBlocking {
65+
@Test fun videos_shortsPrune_whenOnlyMomentViewed() = runBlocking {
6666
db.videoDao().upsert(VideoEntity("v_tt", "tiktok_alice", publishedAt = 10))
6767
db.momentViewDao().upsert(MomentViewEntity("v_tt", viewedAt = 50))
6868
val deleted = db.videoDao().pruneShorts(cutoffMs = 100)
69-
assertEquals(0, deleted)
70-
assertNotNull(db.videoDao().getById("v_tt"))
69+
assertEquals(1, deleted)
70+
assertNull(db.videoDao().getById("v_tt"))
7171
}
7272

7373
// ─── Feed items: side-table protection ────────────────────────────────────

android/app/src/test/java/com/screwy/igloo/sync/AndroidSyncMirrorTest.kt

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,6 +1384,148 @@ class AndroidSyncMirrorTest {
13841384
)
13851385
}
13861386

1387+
@Test fun itemImportParseFailureDoesNotCompleteOrAdvanceFailedPage() = runBlocking {
1388+
val dao = db.androidSyncDao()
1389+
val firstItem = AndroidSyncItemDto(
1390+
seq = 1,
1391+
item_kind = "channels",
1392+
item_id = "sample_channel_one",
1393+
payload = BundleEnvelope(
1394+
primary_kind = "channels",
1395+
primary = buildJsonObject {
1396+
put("channel_id", "sample_channel_one")
1397+
put("source_id", "sample_one")
1398+
put("name", "Sample Channel One")
1399+
put("platform", "youtube")
1400+
},
1401+
),
1402+
)
1403+
val malformedItem = AndroidSyncItemDto(
1404+
seq = 2,
1405+
item_kind = "feed_items",
1406+
item_id = "bad_tweet",
1407+
payload = BundleEnvelope(
1408+
primary_kind = "feed_items",
1409+
primary = buildJsonObject {
1410+
put("tweet_id", "bad_tweet")
1411+
put("published_at", 2L)
1412+
},
1413+
),
1414+
)
1415+
val correctedItem = AndroidSyncItemDto(
1416+
seq = 2,
1417+
item_kind = "feed_items",
1418+
item_id = "bad_tweet",
1419+
payload = BundleEnvelope(
1420+
primary_kind = "feed_items",
1421+
primary = buildJsonObject {
1422+
put("tweet_id", "bad_tweet")
1423+
put("author_handle", "alice")
1424+
put("published_at", 2L)
1425+
},
1426+
),
1427+
)
1428+
val firstEngine = MockEngine { request ->
1429+
when (request.url.encodedPath) {
1430+
"/api/android/sync/generation/latest" -> respondJson(
1431+
AndroidSyncLatestResponse(
1432+
generation = AndroidSyncGenerationDto(
1433+
generation_id = GENERATION_ID,
1434+
created_at_ms = nowMs,
1435+
status = "published",
1436+
source_version = "test",
1437+
item_count = 2,
1438+
),
1439+
),
1440+
)
1441+
"/api/android/sync/generation/$GENERATION_ID/items" -> {
1442+
if (request.url.parameters["after"] == "1") {
1443+
respondJson(
1444+
AndroidSyncItemsResponse(
1445+
generation_id = GENERATION_ID,
1446+
items = listOf(malformedItem),
1447+
end_of_stream = true,
1448+
),
1449+
)
1450+
} else {
1451+
respondJson(
1452+
AndroidSyncItemsResponse(
1453+
generation_id = GENERATION_ID,
1454+
items = listOf(firstItem),
1455+
next = "1",
1456+
end_of_stream = false,
1457+
),
1458+
)
1459+
}
1460+
}
1461+
"/api/android/sync/generation/$GENERATION_ID/assets" -> respondJson(
1462+
AndroidSyncAssetsResponse(
1463+
generation_id = GENERATION_ID,
1464+
assets = emptyList(),
1465+
end_of_stream = true,
1466+
),
1467+
)
1468+
"/api/android/sync/health" -> respond("""{"ok":true}""", HttpStatusCode.OK, jsonHeaders())
1469+
else -> error("Unexpected request ${request.url}")
1470+
}
1471+
}
1472+
1473+
val firstResult = runCatching { buildMirror(firstEngine).syncOnce() }
1474+
1475+
assertTrue(firstResult.isFailure)
1476+
assertNotNull(db.channelDao().getById("sample_channel_one"))
1477+
assertNull(db.feedItemDao().getById("bad_tweet"))
1478+
assertEquals(1L, dao.importedItemSeq(GENERATION_ID))
1479+
assertEquals(0, dao.countItemsImportCompleteForImporter(GENERATION_ID, ANDROID_SYNC_ITEM_IMPORTER_VERSION))
1480+
waitForLog("android_sync_item_parse_failed")
1481+
client.close()
1482+
1483+
val requestedAfterMarkers = Collections.synchronizedList(mutableListOf<String?>())
1484+
val secondEngine = MockEngine { request ->
1485+
when (request.url.encodedPath) {
1486+
"/api/android/sync/generation/latest" -> respondJson(
1487+
AndroidSyncLatestResponse(
1488+
generation = AndroidSyncGenerationDto(
1489+
generation_id = GENERATION_ID,
1490+
created_at_ms = nowMs,
1491+
status = "published",
1492+
source_version = "test",
1493+
item_count = 2,
1494+
),
1495+
),
1496+
)
1497+
"/api/android/sync/generation/$GENERATION_ID/items" -> {
1498+
val after = request.url.parameters["after"]
1499+
requestedAfterMarkers += after
1500+
assertEquals("1", after)
1501+
respondJson(
1502+
AndroidSyncItemsResponse(
1503+
generation_id = GENERATION_ID,
1504+
items = listOf(correctedItem),
1505+
end_of_stream = true,
1506+
),
1507+
)
1508+
}
1509+
"/api/android/sync/generation/$GENERATION_ID/assets" -> respondJson(
1510+
AndroidSyncAssetsResponse(
1511+
generation_id = GENERATION_ID,
1512+
assets = emptyList(),
1513+
end_of_stream = true,
1514+
),
1515+
)
1516+
"/api/android/sync/health" -> respond("""{"ok":true}""", HttpStatusCode.OK, jsonHeaders())
1517+
else -> error("Unexpected request ${request.url}")
1518+
}
1519+
}
1520+
1521+
buildMirror(secondEngine).syncOnce()
1522+
1523+
assertEquals(listOf("1"), requestedAfterMarkers.toList())
1524+
assertNotNull(db.feedItemDao().getById("bad_tweet"))
1525+
assertEquals(2L, dao.importedItemSeq(GENERATION_ID))
1526+
assertEquals(1, dao.countItemsImportCompleteForImporter(GENERATION_ID, ANDROID_SYNC_ITEM_IMPORTER_VERSION))
1527+
}
1528+
13871529
@Test fun itemImportLogsPageDecodeAndIngestCounters() = runBlocking {
13881530
val firstItem = AndroidSyncItemDto(
13891531
seq = 1,
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package com.screwy.igloo.sync
2+
3+
import com.screwy.igloo.data.IglooDatabase
4+
import com.screwy.igloo.data.PreferencesRepo
5+
import com.screwy.igloo.data.RoomTestSupport
6+
import com.screwy.igloo.log.InMemoryLogSink
7+
import com.screwy.igloo.log.Logger
8+
import com.screwy.igloo.net.BundleEnvelope
9+
import com.screwy.igloo.net.ChannelsApi
10+
import com.screwy.igloo.net.DeltaResponse
11+
import com.screwy.igloo.net.FeedApi
12+
import com.screwy.igloo.net.Reachability
13+
import com.screwy.igloo.net.ShortsApi
14+
import com.screwy.igloo.net.VideoApi
15+
import com.screwy.igloo.net.iglooJson
16+
import io.ktor.client.HttpClient
17+
import io.ktor.client.engine.mock.MockEngine
18+
import io.ktor.client.engine.mock.MockRequestHandleScope
19+
import io.ktor.client.engine.mock.respond
20+
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
21+
import io.ktor.http.ContentType
22+
import io.ktor.http.HttpStatusCode
23+
import io.ktor.http.headersOf
24+
import io.ktor.serialization.kotlinx.json.json
25+
import kotlinx.coroutines.CoroutineScope
26+
import kotlinx.coroutines.Dispatchers
27+
import kotlinx.coroutines.SupervisorJob
28+
import kotlinx.coroutines.cancel
29+
import kotlinx.coroutines.cancelAndJoin
30+
import kotlinx.coroutines.delay
31+
import kotlinx.coroutines.flow.emptyFlow
32+
import kotlinx.coroutines.launch
33+
import kotlinx.coroutines.runBlocking
34+
import kotlinx.coroutines.withTimeout
35+
import kotlinx.serialization.encodeToString
36+
import kotlinx.serialization.json.buildJsonObject
37+
import kotlinx.serialization.json.put
38+
import org.junit.After
39+
import org.junit.Assert.assertNotNull
40+
import org.junit.Assert.assertNull
41+
import org.junit.Assert.assertTrue
42+
import org.junit.Before
43+
import org.junit.Test
44+
import org.junit.runner.RunWith
45+
import org.robolectric.RobolectricTestRunner
46+
import org.robolectric.annotation.Config
47+
48+
@RunWith(RobolectricTestRunner::class)
49+
@Config(sdk = [34], manifest = Config.NONE)
50+
class InboundReconcilerTest {
51+
52+
private lateinit var db: IglooDatabase
53+
private lateinit var scope: CoroutineScope
54+
private lateinit var prefs: PreferencesRepo
55+
private lateinit var logger: Logger
56+
private lateinit var logSink: InMemoryLogSink
57+
private lateinit var client: HttpClient
58+
private var nowMs: Long = 10_000L
59+
60+
@Before fun setUp() {
61+
db = RoomTestSupport.freshDb()
62+
scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
63+
prefs = PreferencesRepo(db.preferenceDao(), scope, nowMsProvider = { nowMs })
64+
logSink = InMemoryLogSink()
65+
logger = Logger(prefs = prefs, sink = logSink, scope = scope, nowMsProvider = { nowMs })
66+
}
67+
68+
@After fun tearDown() {
69+
if (::client.isInitialized) client.close()
70+
scope.cancel()
71+
db.close()
72+
}
73+
74+
@Test fun mixedParseFailurePreservesCursorBeforeNextPage() = runBlocking {
75+
runMixedParseFailureCase(endOfStream = false, nextMarker = "opaque-next")
76+
}
77+
78+
@Test fun mixedParseFailurePreservesCursorAtEndOfStream() = runBlocking {
79+
runMixedParseFailureCase(endOfStream = true, nextMarker = "opaque-final")
80+
}
81+
82+
private suspend fun runMixedParseFailureCase(endOfStream: Boolean, nextMarker: String) {
83+
val reconciler = buildReconciler(
84+
MockEngine { request ->
85+
when (request.url.encodedPath) {
86+
"/api/feed/delta" -> respondJson(
87+
DeltaResponse(
88+
bundles = listOf(
89+
BundleEnvelope(
90+
primary_kind = "feed_items",
91+
primary = buildJsonObject {
92+
put("tweet_id", "good_tweet")
93+
put("author_handle", "alice")
94+
put("published_at", 1L)
95+
},
96+
),
97+
BundleEnvelope(
98+
primary_kind = "feed_items",
99+
primary = buildJsonObject {
100+
put("tweet_id", "bad_tweet")
101+
put("published_at", 2L)
102+
},
103+
),
104+
),
105+
next_marker = nextMarker,
106+
end_of_stream = endOfStream,
107+
),
108+
)
109+
else -> error("unexpected path ${request.url.encodedPath}")
110+
}
111+
},
112+
)
113+
val job = scope.launch { reconciler.run() }
114+
try {
115+
reconciler.triggerStream(SyncStream.Feed)
116+
waitForLog("stream_parse_failed")
117+
waitForLog("inbound_pass_done")
118+
} finally {
119+
job.cancelAndJoin()
120+
}
121+
122+
assertNotNull(db.feedItemDao().getById("good_tweet"))
123+
assertNull(db.feedItemDao().getById("bad_tweet"))
124+
assertNull(db.cursorDao().get(SyncStream.Feed.cursorKey))
125+
assertTrue(logSink.snapshot().any { it.event == "stream_parse_failed" })
126+
}
127+
128+
private fun buildReconciler(engine: MockEngine): InboundReconciler {
129+
client = HttpClient(engine) {
130+
install(ContentNegotiation) {
131+
json(iglooJson)
132+
}
133+
}
134+
val baseUrlProvider = { "https://igloo.local" }
135+
val reachability = Reachability(
136+
scope = scope,
137+
probe = { true },
138+
foregroundFlow = emptyFlow(),
139+
).also { it.markOnline() }
140+
return InboundReconciler(
141+
db = db,
142+
prefs = prefs,
143+
cursorDao = db.cursorDao(),
144+
outboxDao = db.outboxDao(),
145+
feedApi = FeedApi(client, baseUrlProvider),
146+
videoApi = VideoApi(client, baseUrlProvider),
147+
shortsApi = ShortsApi(client, baseUrlProvider),
148+
channelsApi = ChannelsApi(client, baseUrlProvider),
149+
reachability = reachability,
150+
logger = logger,
151+
nowMsProvider = { nowMs },
152+
)
153+
}
154+
155+
private suspend fun waitForLog(event: String) {
156+
withTimeout(3_000L) {
157+
while (true) {
158+
if (logSink.snapshot().any { it.event == event }) return@withTimeout
159+
delay(10L)
160+
}
161+
}
162+
}
163+
164+
private inline fun <reified T> MockRequestHandleScope.respondJson(body: T) =
165+
respond(iglooJson.encodeToString(body), HttpStatusCode.OK, jsonHeaders())
166+
167+
private fun jsonHeaders() = headersOf("Content-Type", ContentType.Application.Json.toString())
168+
}

0 commit comments

Comments
 (0)