Skip to content

Commit bbe8fc3

Browse files
authored
all: smoother retry repository queuing (fixes #12179) (#12136)
1 parent 3c47a7a commit bbe8fc3

5 files changed

Lines changed: 213 additions & 103 deletions

File tree

app/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ android {
1212
applicationId "org.ole.planet.myplanet"
1313
minSdk = 26
1414
targetSdk = 36
15-
versionCode = 4964
16-
versionName = "0.49.64"
15+
versionCode = 4965
16+
versionName = "0.49.65"
1717
ndkVersion = '26.3.11579264'
1818
vectorDrawables.useSupportLibrary = true
1919
}

app/src/main/java/org/ole/planet/myplanet/di/RepositoryModule.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,17 @@ import org.ole.planet.myplanet.repository.UserRepositoryImpl
4848
import org.ole.planet.myplanet.repository.VoicesRepository
4949
import org.ole.planet.myplanet.repository.VoicesRepositoryImpl
5050

51+
import org.ole.planet.myplanet.repository.retry.RetryRepository
52+
import org.ole.planet.myplanet.repository.retry.RetryRepositoryImpl
53+
5154
@Module
5255
@InstallIn(SingletonComponent::class)
5356
abstract class RepositoryModule {
5457

58+
@Binds
59+
@Singleton
60+
abstract fun bindRetryRepository(impl: RetryRepositoryImpl): RetryRepository
61+
5562
@Binds
5663
@Singleton
5764
abstract fun bindActivitiesRepository(impl: ActivitiesRepositoryImpl): ActivitiesRepository
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.ole.planet.myplanet.repository.retry
2+
3+
import org.ole.planet.myplanet.model.RealmRetryOperation
4+
import org.ole.planet.myplanet.services.upload.UploadError
5+
6+
interface RetryRepository {
7+
suspend fun enqueue(
8+
uploadType: String,
9+
error: UploadError,
10+
payload: String,
11+
endpoint: String,
12+
httpMethod: String,
13+
dbId: String?,
14+
modelClassName: String,
15+
userId: String?
16+
)
17+
suspend fun updateAttempt(
18+
operationId: String,
19+
error: UploadError
20+
)
21+
suspend fun markInProgress(operationId: String)
22+
suspend fun markCompleted(operationId: String)
23+
suspend fun markFailed(operationId: String, errorMessage: String?, httpCode: Int?)
24+
suspend fun getPending(): List<RealmRetryOperation>
25+
suspend fun getPendingCount(): Long
26+
suspend fun cleanup()
27+
suspend fun resetAllPending()
28+
suspend fun getExistingOperation(itemId: String, uploadType: String): RealmRetryOperation?
29+
suspend fun deletePendingAndAbandonedOperations()
30+
suspend fun recoverStuckOperations()
31+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package org.ole.planet.myplanet.repository.retry
2+
3+
import javax.inject.Inject
4+
import org.ole.planet.myplanet.data.DatabaseService
5+
import org.ole.planet.myplanet.model.RealmRetryOperation
6+
import org.ole.planet.myplanet.repository.RealmRepository
7+
import org.ole.planet.myplanet.services.upload.UploadError
8+
9+
class RetryRepositoryImpl @Inject constructor(
10+
databaseService: DatabaseService
11+
) : RealmRepository(databaseService), RetryRepository {
12+
13+
override suspend fun enqueue(
14+
uploadType: String,
15+
error: UploadError,
16+
payload: String,
17+
endpoint: String,
18+
httpMethod: String,
19+
dbId: String?,
20+
modelClassName: String,
21+
userId: String?
22+
) {
23+
executeTransaction { realm ->
24+
RealmRetryOperation.createFromUploadError(
25+
realm, uploadType, error, payload, endpoint,
26+
httpMethod, dbId, modelClassName, userId
27+
)
28+
}
29+
}
30+
31+
override suspend fun updateAttempt(
32+
operationId: String,
33+
error: UploadError
34+
) {
35+
executeTransaction { realm ->
36+
realm.where(RealmRetryOperation::class.java)
37+
.equalTo("id", operationId)
38+
.findFirst()?.let { op ->
39+
op.attemptCount += 1
40+
op.lastAttemptTime = System.currentTimeMillis()
41+
op.nextRetryTime = RealmRetryOperation.calculateNextRetryTime(op.attemptCount)
42+
op.errorMessage = error.message
43+
op.httpCode = error.httpCode
44+
45+
if (op.attemptCount >= op.maxAttempts) {
46+
op.status = RealmRetryOperation.STATUS_ABANDONED
47+
}
48+
}
49+
}
50+
}
51+
52+
override suspend fun markInProgress(operationId: String) {
53+
executeTransaction { realm ->
54+
realm.where(RealmRetryOperation::class.java)
55+
.equalTo("id", operationId)
56+
.findFirst()?.let { op ->
57+
op.status = RealmRetryOperation.STATUS_IN_PROGRESS
58+
}
59+
}
60+
}
61+
62+
override suspend fun markCompleted(operationId: String) {
63+
executeTransaction { realm ->
64+
realm.where(RealmRetryOperation::class.java)
65+
.equalTo("id", operationId)
66+
.findFirst()?.let { op ->
67+
op.status = RealmRetryOperation.STATUS_COMPLETED
68+
op.lastAttemptTime = System.currentTimeMillis()
69+
}
70+
}
71+
}
72+
73+
override suspend fun markFailed(operationId: String, errorMessage: String?, httpCode: Int?) {
74+
executeTransaction { realm ->
75+
realm.where(RealmRetryOperation::class.java)
76+
.equalTo("id", operationId)
77+
.findFirst()?.let { op ->
78+
op.attemptCount += 1
79+
op.lastAttemptTime = System.currentTimeMillis()
80+
op.errorMessage = errorMessage
81+
op.httpCode = httpCode
82+
83+
if (op.attemptCount >= op.maxAttempts) {
84+
op.status = RealmRetryOperation.STATUS_ABANDONED
85+
} else {
86+
op.status = RealmRetryOperation.STATUS_PENDING
87+
op.nextRetryTime = RealmRetryOperation.calculateNextRetryTime(op.attemptCount)
88+
}
89+
}
90+
}
91+
}
92+
93+
override suspend fun getPending(): List<RealmRetryOperation> {
94+
return withRealmAsync { realm ->
95+
RealmRetryOperation.getPendingOperations(realm)
96+
}
97+
}
98+
99+
override suspend fun getPendingCount(): Long {
100+
return withRealmAsync { realm ->
101+
RealmRetryOperation.getFailedOperationsCount(realm)
102+
}
103+
}
104+
105+
override suspend fun cleanup() {
106+
executeTransaction { realm ->
107+
RealmRetryOperation.cleanupCompletedOperations(realm)
108+
}
109+
}
110+
111+
override suspend fun resetAllPending() {
112+
executeTransaction { realm ->
113+
realm.where(RealmRetryOperation::class.java)
114+
.equalTo("status", RealmRetryOperation.STATUS_PENDING)
115+
.findAll()
116+
.forEach { op ->
117+
op.nextRetryTime = System.currentTimeMillis()
118+
}
119+
}
120+
}
121+
122+
override suspend fun getExistingOperation(itemId: String, uploadType: String): RealmRetryOperation? {
123+
return withRealmAsync { realm ->
124+
realm.where(RealmRetryOperation::class.java)
125+
.equalTo("itemId", itemId)
126+
.equalTo("uploadType", uploadType)
127+
.notEqualTo("status", RealmRetryOperation.STATUS_COMPLETED)
128+
.notEqualTo("status", RealmRetryOperation.STATUS_ABANDONED)
129+
.findFirst()
130+
?.let { realm.copyFromRealm(it) }
131+
}
132+
}
133+
134+
override suspend fun deletePendingAndAbandonedOperations() {
135+
executeTransaction { realm ->
136+
realm.where(RealmRetryOperation::class.java)
137+
.equalTo("status", RealmRetryOperation.STATUS_PENDING)
138+
.or()
139+
.equalTo("status", RealmRetryOperation.STATUS_ABANDONED)
140+
.findAll()
141+
.deleteAllFromRealm()
142+
}
143+
}
144+
145+
override suspend fun recoverStuckOperations() {
146+
executeTransaction { realm ->
147+
realm.where(RealmRetryOperation::class.java)
148+
.equalTo("status", RealmRetryOperation.STATUS_IN_PROGRESS)
149+
.findAll()
150+
.forEach { op ->
151+
op.status = RealmRetryOperation.STATUS_PENDING
152+
op.nextRetryTime = System.currentTimeMillis() + 60_000 // Retry in 1 minute
153+
}
154+
}
155+
}
156+
}

app/src/main/java/org/ole/planet/myplanet/services/retry/RetryQueue.kt

Lines changed: 17 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import javax.inject.Inject
99
import javax.inject.Singleton
1010
import kotlinx.coroutines.sync.Mutex
1111
import kotlinx.coroutines.sync.withLock
12-
import org.ole.planet.myplanet.data.DatabaseService
12+
import org.ole.planet.myplanet.repository.retry.RetryRepository
1313
import org.ole.planet.myplanet.model.RealmRetryOperation
1414
import org.ole.planet.myplanet.services.upload.UploadError
1515

1616
@Singleton
1717
class RetryQueue @Inject constructor(
18-
private val databaseService: DatabaseService,
18+
private val retryRepository: RetryRepository,
1919
@ApplicationContext private val context: Context
2020
) {
2121
companion object {
@@ -46,41 +46,16 @@ class RetryQueue @Inject constructor(
4646
return
4747
}
4848

49-
val existingOperation = databaseService.withRealmAsync { realm ->
50-
realm.where(RealmRetryOperation::class.java)
51-
.equalTo("itemId", error.itemId)
52-
.equalTo("uploadType", uploadType)
53-
.notEqualTo("status", RealmRetryOperation.STATUS_COMPLETED)
54-
.notEqualTo("status", RealmRetryOperation.STATUS_ABANDONED)
55-
.findFirst()
56-
?.let { realm.copyFromRealm(it) }
57-
}
49+
val existingOperation = retryRepository.getExistingOperation(error.itemId, uploadType)
5850

5951
if (existingOperation != null) {
60-
databaseService.executeTransactionAsync { realm ->
61-
realm.where(RealmRetryOperation::class.java)
62-
.equalTo("id", existingOperation.id)
63-
.findFirst()?.let { op ->
64-
op.attemptCount += 1
65-
op.lastAttemptTime = System.currentTimeMillis()
66-
op.nextRetryTime = RealmRetryOperation.calculateNextRetryTime(op.attemptCount)
67-
op.errorMessage = error.message
68-
op.httpCode = error.httpCode
69-
70-
if (op.attemptCount >= op.maxAttempts) {
71-
op.status = RealmRetryOperation.STATUS_ABANDONED
72-
Log.w(TAG, "Operation ${op.id} abandoned after ${op.maxAttempts} attempts")
73-
}
74-
}
75-
}
52+
retryRepository.updateAttempt(existingOperation.id, error)
7653
Log.d(TAG, "Updated existing retry operation for item ${error.itemId}")
7754
} else {
78-
databaseService.executeTransactionAsync { realm ->
79-
RealmRetryOperation.createFromUploadError(
80-
realm, uploadType, error, payload.toString(), endpoint,
81-
httpMethod, dbId, modelClassName, userId
82-
)
83-
}
55+
retryRepository.enqueue(
56+
uploadType, error, payload.toString(), endpoint,
57+
httpMethod, dbId, modelClassName, userId
58+
)
8459
Log.i(TAG, "RETRY_QUEUE: Queued new operation - type=$uploadType, itemId=${error.itemId}, error=${error.message}")
8560
}
8661
}
@@ -109,75 +84,32 @@ class RetryQueue @Inject constructor(
10984
}
11085

11186
suspend fun getPendingOperations(): List<RealmRetryOperation> {
112-
return databaseService.withRealmAsync { realm ->
113-
RealmRetryOperation.getPendingOperations(realm)
114-
}
87+
return retryRepository.getPending()
11588
}
11689

11790
suspend fun getPendingCount(): Long {
118-
return databaseService.withRealmAsync { realm ->
119-
RealmRetryOperation.getFailedOperationsCount(realm)
120-
}
91+
return retryRepository.getPendingCount()
12192
}
12293

12394
suspend fun markInProgress(operationId: String) {
124-
databaseService.executeTransactionAsync { realm ->
125-
realm.where(RealmRetryOperation::class.java)
126-
.equalTo("id", operationId)
127-
.findFirst()?.let { op ->
128-
op.status = RealmRetryOperation.STATUS_IN_PROGRESS
129-
}
130-
}
95+
retryRepository.markInProgress(operationId)
13196
}
13297

13398
suspend fun markCompleted(operationId: String) {
134-
databaseService.executeTransactionAsync { realm ->
135-
realm.where(RealmRetryOperation::class.java)
136-
.equalTo("id", operationId)
137-
.findFirst()?.let { op ->
138-
op.status = RealmRetryOperation.STATUS_COMPLETED
139-
op.lastAttemptTime = System.currentTimeMillis()
140-
}
141-
}
99+
retryRepository.markCompleted(operationId)
142100
Log.d(TAG, "Marked operation $operationId as completed")
143101
}
144102

145103
suspend fun markFailed(operationId: String, errorMessage: String?, httpCode: Int?) {
146-
databaseService.executeTransactionAsync { realm ->
147-
realm.where(RealmRetryOperation::class.java)
148-
.equalTo("id", operationId)
149-
.findFirst()?.let { op ->
150-
op.attemptCount += 1
151-
op.lastAttemptTime = System.currentTimeMillis()
152-
op.errorMessage = errorMessage
153-
op.httpCode = httpCode
154-
155-
if (op.attemptCount >= op.maxAttempts) {
156-
op.status = RealmRetryOperation.STATUS_ABANDONED
157-
Log.w(TAG, "Operation $operationId abandoned after ${op.maxAttempts} attempts")
158-
} else {
159-
op.status = RealmRetryOperation.STATUS_PENDING
160-
op.nextRetryTime = RealmRetryOperation.calculateNextRetryTime(op.attemptCount)
161-
}
162-
}
163-
}
104+
retryRepository.markFailed(operationId, errorMessage, httpCode)
164105
}
165106

166107
suspend fun cleanup() {
167-
databaseService.executeTransactionAsync { realm ->
168-
RealmRetryOperation.cleanupCompletedOperations(realm)
169-
}
108+
retryRepository.cleanup()
170109
}
171110

172111
suspend fun resetAllPending() {
173-
databaseService.executeTransactionAsync { realm ->
174-
realm.where(RealmRetryOperation::class.java)
175-
.equalTo("status", RealmRetryOperation.STATUS_PENDING)
176-
.findAll()
177-
.forEach { op ->
178-
op.nextRetryTime = System.currentTimeMillis()
179-
}
180-
}
112+
retryRepository.resetAllPending()
181113
}
182114

183115
/**
@@ -196,15 +128,7 @@ class RetryQueue @Inject constructor(
196128
return@withLock false
197129
}
198130

199-
databaseService.executeTransactionAsync { realm ->
200-
// Only delete pending and abandoned, not in_progress or completed
201-
realm.where(RealmRetryOperation::class.java)
202-
.equalTo("status", RealmRetryOperation.STATUS_PENDING)
203-
.or()
204-
.equalTo("status", RealmRetryOperation.STATUS_ABANDONED)
205-
.findAll()
206-
.deleteAllFromRealm()
207-
}
131+
retryRepository.deletePendingAndAbandonedOperations()
208132
Log.i(TAG, "Queue cleared successfully")
209133
true
210134
}
@@ -215,14 +139,6 @@ class RetryQueue @Inject constructor(
215139
* Called on app startup to recover from crashes.
216140
*/
217141
suspend fun recoverStuckOperations() {
218-
databaseService.executeTransactionAsync { realm ->
219-
realm.where(RealmRetryOperation::class.java)
220-
.equalTo("status", RealmRetryOperation.STATUS_IN_PROGRESS)
221-
.findAll()
222-
.forEach { op ->
223-
op.status = RealmRetryOperation.STATUS_PENDING
224-
op.nextRetryTime = System.currentTimeMillis() + 60_000 // Retry in 1 minute
225-
}
226-
}
142+
retryRepository.recoverStuckOperations()
227143
}
228144
}

0 commit comments

Comments
 (0)