Skip to content

Commit aa26e4f

Browse files
committed
add max batch size to normal events
1 parent c009e3e commit aa26e4f

3 files changed

Lines changed: 88 additions & 48 deletions

File tree

posthog/src/main/java/com/posthog/internal/PostHogQueue.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ internal class PostHogQueue(private val config: PostHogConfig, private val api:
5151
synchronized(dequeLock) {
5252
first = deque.removeFirst()
5353
}
54-
first.delete()
54+
first.deleteSafely(config)
5555
config.logger.log("Queue is full, the oldest event ${first.name} is dropped.")
5656
} catch (ignore: NoSuchElementException) {}
5757
}
@@ -153,7 +153,7 @@ internal class PostHogQueue(private val config: PostHogConfig, private val api:
153153
synchronized(dequeLock) {
154154
deque.remove(file)
155155
}
156-
file.delete()
156+
file.deleteSafely(config)
157157
config.logger.log("File: ${file.name} failed to parse: $e.")
158158
}
159159
}
@@ -180,7 +180,7 @@ internal class PostHogQueue(private val config: PostHogConfig, private val api:
180180
}
181181

182182
files.forEach {
183-
it.delete()
183+
it.deleteSafely(config)
184184
}
185185
}
186186
}
@@ -276,7 +276,7 @@ internal class PostHogQueue(private val config: PostHogConfig, private val api:
276276
deque.clear()
277277
}
278278
tempFiles.forEach {
279-
it.delete()
279+
it.deleteSafely(config)
280280
}
281281
}
282282
}

posthog/src/main/java/com/posthog/internal/PostHogSendCachedEventsIntegration.kt

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,12 @@ internal class PostHogSendCachedEventsIntegration(private val config: PostHogCon
2525
executor.shutdown()
2626
}
2727

28-
// TODO: respect maxBatchSize
29-
3028
private fun flushLegacyEvents() {
3129
config.legacyStoragePrefix?.let {
3230
val legacyDir = File(it)
3331
val legacyFile = File(legacyDir, "${config.apiKey}.tmp")
3432

35-
if (!legacyFile.exists()) {
33+
if (!legacyFile.existsSafely(config)) {
3634
return
3735
}
3836

@@ -87,12 +85,11 @@ internal class PostHogSendCachedEventsIntegration(private val config: PostHogCon
8785
val event = serializer.deserializeEvent(inputStream.reader().buffered())
8886
event?.let {
8987
events.add(event)
88+
eventsCount++
9089
}
9190
} catch (e: Throwable) {
9291
iterator.remove()
9392
config.logger.log("Event failed to parse: $e.")
94-
} finally {
95-
eventsCount++
9693
}
9794
// stop the while loop since the batch is full
9895
if (events.size >= config.maxBatchSize) {
@@ -121,7 +118,9 @@ internal class PostHogSendCachedEventsIntegration(private val config: PostHogCon
121118
try {
122119
legacy.remove()
123120
} catch (e: NoSuchElementException) {
124-
legacyFile.delete()
121+
// this should not happen but even if it does,
122+
// we delete the queue file then
123+
legacyFile.deleteSafely(config)
125124
} catch (e: Throwable) {
126125
config.logger.log("Error deleting file: $e.")
127126
}
@@ -140,55 +139,77 @@ internal class PostHogSendCachedEventsIntegration(private val config: PostHogCon
140139
config.storagePrefix?.let {
141140
val dir = File(it, config.apiKey)
142141

143-
if (!dir.exists()) {
142+
if (!dir.existsSafely(config)) {
144143
return
145144
}
145+
try {
146+
// so that we don't try to send events in this batch that is already in the queue
147+
// but just cached events
148+
val time = startDate.time
149+
val fileFilter = FileFilter { file -> file.lastModified() <= time }
146150

147-
// so that we don't try to send events in this batch that is already in the queue
148-
// but just cached events
149-
val time = startDate.time
150-
val fileFilter = FileFilter { file -> file.lastModified() <= time }
151+
val listFiles = (dir.listFiles(fileFilter) ?: emptyArray()).toMutableList()
151152

152-
val listFiles = (dir.listFiles(fileFilter) ?: emptyArray()).toMutableList()
153-
val events = mutableListOf<PostHogEvent>()
154-
val iterator = listFiles.iterator()
153+
while (listFiles.isNotEmpty()) {
154+
val events = mutableListOf<PostHogEvent>()
155+
val iterator = listFiles.iterator()
156+
var eventsCount = 0
155157

156-
while (iterator.hasNext()) {
157-
val file = iterator.next()
158+
while (iterator.hasNext()) {
159+
val file = iterator.next()
158160

159-
try {
160-
val inputStream = config.encryption?.decrypt(file.inputStream()) ?: file.inputStream()
161-
val event = serializer.deserializeEvent(inputStream.reader().buffered())
162-
event?.let {
163-
events.add(event)
164-
}
165-
} catch (e: Throwable) {
166-
iterator.remove()
167-
file.delete()
168-
config.logger.log("File: ${file.name} failed to parse: $e.")
169-
}
170-
}
161+
try {
162+
val inputStream =
163+
config.encryption?.decrypt(file.inputStream()) ?: file.inputStream()
164+
val event = serializer.deserializeEvent(inputStream.reader().buffered())
165+
event?.let {
166+
events.add(event)
167+
eventsCount++
168+
}
169+
} catch (e: Throwable) {
170+
config.logger.log("File: ${file.name} failed to parse: $e.")
171+
iterator.remove()
172+
file.deleteSafely(config)
173+
}
171174

172-
if (events.isNotEmpty()) {
173-
var deleteFiles = true
174-
try {
175-
api.batch(events)
176-
} catch (e: PostHogApiError) {
177-
if (e.statusCode < 400) {
178-
deleteFiles = false
179-
}
180-
} catch (e: IOException) {
181-
// no connection should try again
182-
if (e.isNetworkingError()) {
183-
deleteFiles = false
175+
// stop the while loop since the batch is full
176+
if (events.size >= config.maxBatchSize) {
177+
break
178+
}
184179
}
185-
} finally {
186-
if (deleteFiles) {
187-
listFiles.forEach { file ->
188-
file.delete()
180+
181+
if (events.isNotEmpty()) {
182+
var deleteFiles = true
183+
try {
184+
api.batch(events)
185+
} catch (e: PostHogApiError) {
186+
if (e.statusCode < 400) {
187+
deleteFiles = false
188+
}
189+
throw e
190+
} catch (e: IOException) {
191+
// no connection should try again
192+
if (e.isNetworkingError()) {
193+
deleteFiles = false
194+
}
195+
throw e
196+
} finally {
197+
if (deleteFiles) {
198+
for (i in 1..eventsCount) {
199+
var file: File? = null
200+
try {
201+
file = listFiles.removeFirst()
202+
file.deleteSafely(config)
203+
} catch (e: Throwable) {
204+
config.logger.log("Failed to remove file: ${file?.name}: $e.")
205+
}
206+
}
207+
}
189208
}
190209
}
191210
}
211+
} catch (e: Throwable) {
212+
config.logger.log("Flushing events failed: $e.")
192213
}
193214
}
194215
}

posthog/src/main/java/com/posthog/internal/PostHogUtils.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.posthog.internal
22

3+
import com.posthog.PostHogConfig
4+
import java.io.File
35
import java.io.IOException
46
import java.io.InterruptedIOException
57
import java.net.SocketTimeoutException
@@ -23,3 +25,20 @@ internal fun Throwable.isNetworkingError(): Boolean {
2325
noInternetAvailable(this) ||
2426
isRequestCanceled(this)
2527
}
28+
29+
internal fun File.deleteSafely(config: PostHogConfig) {
30+
try {
31+
delete()
32+
} catch (e: Throwable) {
33+
config.logger.log("Error deleting the file $name: $e.")
34+
}
35+
}
36+
37+
internal fun File.existsSafely(config: PostHogConfig): Boolean {
38+
return try {
39+
exists()
40+
} catch (e: Throwable) {
41+
config.logger.log("Error deleting the file $name: $e.")
42+
false
43+
}
44+
}

0 commit comments

Comments
 (0)