Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion libs/utils/src/main/java/com/akto/data_actor/DbLayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.akto.bulk_update_util.ApiInfoBulkUpdate;
import com.akto.dao.*;
import com.akto.dao.AccountsContextDao;
import com.akto.dao.AgentTrafficLogDao;
import com.akto.dao.CyborgLogsDao;
import com.akto.dao.filter.MergedUrlsDao;
Expand Down Expand Up @@ -137,6 +138,12 @@ public class DbLayer {
private static volatile long mergedUrlsCacheTimestamp = 0;
private static final long MERGED_URLS_CACHE_TTL_MS = 15 * 60 * 1000; // 15 minutes

// Collection cleanup configuration
private static final ConcurrentHashMap<String, Integer> collectionCleanupCache = new ConcurrentHashMap<>();
private static final int CLEANUP_INTERVAL_SECONDS = 30 * 60; // 30 minutes
private static final int CLEANUP_JITTER_SECONDS = 3 * 60; // 3 minutes max jitter
private static final long COLLECTION_SIZE_THRESHOLD = 100_000;

private static int getLastUpdatedTsForAccount(int accountId) {
return lastUpdatedTsMap.computeIfAbsent(accountId, k -> 0);
}
Expand Down Expand Up @@ -761,7 +768,62 @@ public static void createCollectionForHostAndVpc(String host, int id, String vpc
}

public static void insertRuntimeLog(Log log) {
RuntimeLogsDao.instance.insertOne(log);
RuntimeLogsDao runtimeLogsDao = RuntimeLogsDao.instance;
cleanupCollectionIfNeeded(runtimeLogsDao.getCollName(), runtimeLogsDao);
runtimeLogsDao.insertOne(log);
}

private static void cleanupCollectionIfNeeded(String collectionName, AccountsContextDao<?> dao) {
try {
int accountId = Context.accountId.get();
String cacheKey = accountId + "_" + collectionName;

int currentTime = Context.now();
Integer nextCleanupTime = collectionCleanupCache.get(cacheKey);

if (nextCleanupTime != null && currentTime < nextCleanupTime) {
return;
}

long estimatedCount = dao.getMCollection().estimatedDocumentCount();

if (estimatedCount >= COLLECTION_SIZE_THRESHOLD) {
List<String> slackMessages = new ArrayList<>();

String beforeDropMsg = String.format(
"Dropping collection - collection=%s, account=%d, estimatedCount=%d, threshold=%d, timestamp=%d",
collectionName, accountId, estimatedCount, COLLECTION_SIZE_THRESHOLD, Context.now()
);
loggerMaker.infoAndAddToDb(beforeDropMsg);
slackMessages.add(beforeDropMsg);

long startDropTime = System.currentTimeMillis();
dao.getMCollection().drop();
long endDropTime = System.currentTimeMillis();

String afterDropMsg = String.format(
"Successfully dropped collection=%s, account=%d, timestamp=%d, timeTakenMilliseconds=%d",
collectionName, accountId, endDropTime, endDropTime - startDropTime
);
loggerMaker.infoAndAddToDb(afterDropMsg);
slackMessages.add(afterDropMsg);

// Send combined message to Slack
String combinedMessage = String.join("\n", slackMessages);
loggerMaker.sendCyborgSlackAsync(combinedMessage);
}

// Calculate next cleanup time with jitter (0 to 3 minutes)
int jitter = (int) (Math.random() * CLEANUP_JITTER_SECONDS);
int nextScheduledCleanup = currentTime + CLEANUP_INTERVAL_SECONDS + jitter;
collectionCleanupCache.put(cacheKey, nextScheduledCleanup);

} catch (Exception e) {
loggerMaker.errorAndAddToDb(e,
String.format("Error during cleanup of collection %s, accountId: %s: %s", collectionName,
Context.accountId.get(), e)
);
}
}

public static void insertPersistentRuntimeLog(Log log) {
Expand Down
21 changes: 14 additions & 7 deletions libs/utils/src/main/java/com/akto/log/LoggerMaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ protected static void sendToCyborgSlack(String err){
sendToSlack(slackCyborgWebhookUrl, err);
}

public void sendCyborgSlackAsync(String message) {
if (LogDb.DB_ABS != this.db) {
return;
}
service.submit(() -> {
try {
sendToCyborgSlack(message);
} catch (Exception e){
internalLogger.error("Error in sending cyborg logs to Slack: " + e.getMessage());
}
});
}

private String formatWithAccountId(String msg) {
Integer accountId = Context.accountId.get();
if (accountId != null) {
Expand Down Expand Up @@ -205,13 +218,7 @@ public void errorAndAddToDb(String err, LogDb db) {
if (db.equals(LogDb.BILLING) || db.equals(LogDb.DASHBOARD)) {
sendToSlack(finalError);
} else if(LogDb.DB_ABS.equals(db)){
service.submit(() -> {
try {
sendToCyborgSlack(finalError);
} catch (Exception e){
internalLogger.error("Error in sending cyborg error logs %s" , e.getMessage());
}
});
sendCyborgSlackAsync(finalError);
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
Loading