Skip to content

Commit ab2c338

Browse files
authored
Add Cleanup Collection logic for logs_runtime (#3770)
* clean up logs_runtime collection if size exceeded threshold * clean up logs_runtime collection if size exceeded threshold * add logging
1 parent 8ed5602 commit ab2c338

File tree

2 files changed

+77
-8
lines changed

2 files changed

+77
-8
lines changed

libs/utils/src/main/java/com/akto/data_actor/DbLayer.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.akto.bulk_update_util.ApiInfoBulkUpdate;
1919
import com.akto.dao.*;
20+
import com.akto.dao.AccountsContextDao;
2021
import com.akto.dao.AgentTrafficLogDao;
2122
import com.akto.dao.CyborgLogsDao;
2223
import com.akto.dao.filter.MergedUrlsDao;
@@ -137,6 +138,12 @@ public class DbLayer {
137138
private static volatile long mergedUrlsCacheTimestamp = 0;
138139
private static final long MERGED_URLS_CACHE_TTL_MS = 15 * 60 * 1000; // 15 minutes
139140

141+
// Collection cleanup configuration
142+
private static final ConcurrentHashMap<String, Integer> collectionCleanupCache = new ConcurrentHashMap<>();
143+
private static final int CLEANUP_INTERVAL_SECONDS = 30 * 60; // 30 minutes
144+
private static final int CLEANUP_JITTER_SECONDS = 3 * 60; // 3 minutes max jitter
145+
private static final long COLLECTION_SIZE_THRESHOLD = 100_000;
146+
140147
private static int getLastUpdatedTsForAccount(int accountId) {
141148
return lastUpdatedTsMap.computeIfAbsent(accountId, k -> 0);
142149
}
@@ -761,7 +768,62 @@ public static void createCollectionForHostAndVpc(String host, int id, String vpc
761768
}
762769

763770
public static void insertRuntimeLog(Log log) {
764-
RuntimeLogsDao.instance.insertOne(log);
771+
RuntimeLogsDao runtimeLogsDao = RuntimeLogsDao.instance;
772+
cleanupCollectionIfNeeded(runtimeLogsDao.getCollName(), runtimeLogsDao);
773+
runtimeLogsDao.insertOne(log);
774+
}
775+
776+
private static void cleanupCollectionIfNeeded(String collectionName, AccountsContextDao<?> dao) {
777+
try {
778+
int accountId = Context.accountId.get();
779+
String cacheKey = accountId + "_" + collectionName;
780+
781+
int currentTime = Context.now();
782+
Integer nextCleanupTime = collectionCleanupCache.get(cacheKey);
783+
784+
if (nextCleanupTime != null && currentTime < nextCleanupTime) {
785+
return;
786+
}
787+
788+
long estimatedCount = dao.getMCollection().estimatedDocumentCount();
789+
790+
if (estimatedCount >= COLLECTION_SIZE_THRESHOLD) {
791+
List<String> slackMessages = new ArrayList<>();
792+
793+
String beforeDropMsg = String.format(
794+
"Dropping collection - collection=%s, account=%d, estimatedCount=%d, threshold=%d, timestamp=%d",
795+
collectionName, accountId, estimatedCount, COLLECTION_SIZE_THRESHOLD, Context.now()
796+
);
797+
loggerMaker.infoAndAddToDb(beforeDropMsg);
798+
slackMessages.add(beforeDropMsg);
799+
800+
long startDropTime = System.currentTimeMillis();
801+
dao.getMCollection().drop();
802+
long endDropTime = System.currentTimeMillis();
803+
804+
String afterDropMsg = String.format(
805+
"Successfully dropped collection=%s, account=%d, timestamp=%d, timeTakenMilliseconds=%d",
806+
collectionName, accountId, endDropTime, endDropTime - startDropTime
807+
);
808+
loggerMaker.infoAndAddToDb(afterDropMsg);
809+
slackMessages.add(afterDropMsg);
810+
811+
// Send combined message to Slack
812+
String combinedMessage = String.join("\n", slackMessages);
813+
loggerMaker.sendCyborgSlackAsync(combinedMessage);
814+
}
815+
816+
// Calculate next cleanup time with jitter (0 to 3 minutes)
817+
int jitter = (int) (Math.random() * CLEANUP_JITTER_SECONDS);
818+
int nextScheduledCleanup = currentTime + CLEANUP_INTERVAL_SECONDS + jitter;
819+
collectionCleanupCache.put(cacheKey, nextScheduledCleanup);
820+
821+
} catch (Exception e) {
822+
loggerMaker.errorAndAddToDb(e,
823+
String.format("Error during cleanup of collection %s, accountId: %s: %s", collectionName,
824+
Context.accountId.get(), e)
825+
);
826+
}
765827
}
766828

767829
public static void insertPersistentRuntimeLog(Log log) {

libs/utils/src/main/java/com/akto/log/LoggerMaker.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,19 @@ protected static void sendToCyborgSlack(String err){
164164
sendToSlack(slackCyborgWebhookUrl, err);
165165
}
166166

167+
public void sendCyborgSlackAsync(String message) {
168+
if (LogDb.DB_ABS != this.db) {
169+
return;
170+
}
171+
service.submit(() -> {
172+
try {
173+
sendToCyborgSlack(message);
174+
} catch (Exception e){
175+
internalLogger.error("Error in sending cyborg logs to Slack: " + e.getMessage());
176+
}
177+
});
178+
}
179+
167180
private String formatWithAccountId(String msg) {
168181
Integer accountId = Context.accountId.get();
169182
if (accountId != null) {
@@ -205,13 +218,7 @@ public void errorAndAddToDb(String err, LogDb db) {
205218
if (db.equals(LogDb.BILLING) || db.equals(LogDb.DASHBOARD)) {
206219
sendToSlack(finalError);
207220
} else if(LogDb.DB_ABS.equals(db)){
208-
service.submit(() -> {
209-
try {
210-
sendToCyborgSlack(finalError);
211-
} catch (Exception e){
212-
internalLogger.error("Error in sending cyborg error logs %s" , e.getMessage());
213-
}
214-
});
221+
sendCyborgSlackAsync(finalError);
215222
}
216223
} catch (Exception e) {
217224
e.printStackTrace();

0 commit comments

Comments
 (0)