22
33import com .akto .log .LoggerMaker ;
44import com .akto .dao .context .Context ;
5- import com .akto .threat .backend .dao .ArchivedMaliciousEventDao ;
65import com .akto .threat .backend .dao .MaliciousEventDao ;
76import com .akto .threat .backend .dao .ThreatConfigurationDao ;
87import com .mongodb .client .MongoClient ;
@@ -29,8 +28,8 @@ public class ArchiveOldMaliciousEventsCron implements Runnable {
2928 private static final long DEFAULT_RETENTION_DAYS = 60L ; // default, can be overridden from DB
3029 private static final long MIN_RETENTION_DAYS = 30L ;
3130 private static final long MAX_RETENTION_DAYS = 90L ;
32- private static final long MAX_SOURCE_DOCS = 400_000L ; // cap size
3331 private static final long MAX_DELETES_PER_ITERATION = 100_000L ; // cap per cron iteration
32+ private static final long MAX_SOURCE_DOCS = 1000_000L ;
3433
3534 private final MongoClient mongoClient ;
3635 private final ScheduledExecutorService scheduler ;
@@ -97,19 +96,18 @@ private boolean shouldSkipDatabase(String dbName) {
9796 private void archiveOldMaliciousEvents (String dbName , long nowSeconds ) {
9897 String accountId = dbName ;
9998
100- // Check if archival is enabled for this account
101- if (!isArchivalEnabled (accountId )) {
102- logger .infoAndAddToDb ("Archival is disabled for account " + accountId + ", skipping" , LoggerMaker .LogDb .RUNTIME );
99+ // Check if deletion is enabled for this account
100+ if (!isDeletionEnabled (accountId )) {
101+ logger .infoAndAddToDb ("Deletion is disabled for account " + accountId + ", skipping" , LoggerMaker .LogDb .RUNTIME );
103102 return ;
104103 }
105104
106105 long retentionDays = fetchRetentionDays (accountId );
107106 long threshold = nowSeconds - (retentionDays * 24 * 60 * 60 );
108107
109108 MongoCollection <Document > source = MaliciousEventDao .instance .getDocumentCollection (accountId );
110- MongoCollection <Document > dest = ArchivedMaliciousEventDao .instance .getCollection (accountId );
111109
112- int totalMoved = 0 ;
110+ int totalDeleted = 0 ;
113111 long deletesThisIteration = 0L ;
114112
115113 while (true ) {
@@ -132,14 +130,12 @@ private void archiveOldMaliciousEvents(String dbName, long nowSeconds) {
132130 ids .add (doc .get ("_id" ));
133131 }
134132
135- asyncUpsertToArchive (batch , accountId );
136-
137133 long deleted = deleteByIds (source , ids , accountId );
138- totalMoved += (int ) deleted ;
134+ totalDeleted += (int ) deleted ;
139135 deletesThisIteration += deleted ;
140136
141137 long iterationElapsedMs = (System .nanoTime () - iterationStartNanos ) / 1_000_000L ;
142- logger .infoAndAddToDb ("Archive loop iteration in db " + dbName + ": batch=" + batch .size () + ", deleted=" + deleted + ", tookMs=" + iterationElapsedMs , LoggerMaker .LogDb .RUNTIME );
138+ logger .infoAndAddToDb ("Delete loop iteration in db " + dbName + ": batch=" + batch .size () + ", deleted=" + deleted + ", tookMs=" + iterationElapsedMs , LoggerMaker .LogDb .RUNTIME );
143139
144140 if (batch .size () < BATCH_SIZE ) {
145141 break ;
@@ -151,14 +147,13 @@ private void archiveOldMaliciousEvents(String dbName, long nowSeconds) {
151147 }
152148 }
153149
154- if (totalMoved > 0 ) {
155- logger .infoAndAddToDb ("Completed archiving for db " + dbName + ", total moved : " + totalMoved , LoggerMaker .LogDb .RUNTIME );
150+ if (totalDeleted > 0 ) {
151+ logger .infoAndAddToDb ("Completed deletion for db " + dbName + ", total deleted : " + totalDeleted , LoggerMaker .LogDb .RUNTIME );
156152 }
157153
158- // Enforce collection size cap by trimming oldest docs beyond 400k.
159154 try {
160155 if (deletesThisIteration < MAX_DELETES_PER_ITERATION ) {
161- trimCollectionIfExceedsCap (accountId , source , dest );
156+ trimCollectionIfExceedsCap (accountId , source );
162157 } else {
163158 logger .infoAndAddToDb ("Skipping trim step as delete cap reached in db " + dbName , LoggerMaker .LogDb .RUNTIME );
164159 }
@@ -167,7 +162,7 @@ private void archiveOldMaliciousEvents(String dbName, long nowSeconds) {
167162 }
168163 }
169164
170- private boolean isArchivalEnabled (String accountId ) {
165+ private boolean isDeletionEnabled (String accountId ) {
171166 try {
172167 Document doc = ThreatConfigurationDao .instance .getCollection (accountId ).find ().first ();
173168 if (doc == null ) return false ; // disabled by default
@@ -199,7 +194,7 @@ private long fetchRetentionDays(String accountId) {
199194 return DEFAULT_RETENTION_DAYS ;
200195 }
201196
202- private void trimCollectionIfExceedsCap (String accountId , MongoCollection <Document > source , MongoCollection < Document > dest ) {
197+ private void trimCollectionIfExceedsCap (String accountId , MongoCollection <Document > source ) {
203198 long approxCount = source .countDocuments ();
204199
205200 if (approxCount <= MAX_SOURCE_DOCS ) return ;
@@ -223,8 +218,6 @@ private void trimCollectionIfExceedsCap(String accountId, MongoCollection<Docume
223218
224219 if (oldestDocs .isEmpty ()) break ;
225220
226- asyncUpsertToArchive (oldestDocs , accountId );
227-
228221 Set <Object > ids = new HashSet <>();
229222 for (Document d : oldestDocs ) {
230223 ids .add (d .get ("_id" ));
@@ -242,19 +235,6 @@ private void trimCollectionIfExceedsCap(String accountId, MongoCollection<Docume
242235 }
243236 }
244237
245- private void asyncUpsertToArchive (List <Document > docs , String accountId ) {
246- if (docs == null || docs .isEmpty ()) return ;
247- final List <Document > docsSnapshot = new ArrayList <>(docs );
248- final String accountIdFinal = accountId ;
249- this .scheduler .submit (() -> {
250- try {
251- ArchivedMaliciousEventDao .instance .bulkInsert (accountIdFinal , docsSnapshot );
252- } catch (Exception e ) {
253- logger .errorAndAddToDb ("Async error writing archive batch in account " + accountIdFinal + ": " + e .getMessage (), LoggerMaker .LogDb .RUNTIME );
254- }
255- });
256- }
257-
258238 private long deleteByIds (MongoCollection <Document > source , Set <Object > ids , String accountId ) {
259239 if (ids == null || ids .isEmpty ()) return 0L ;
260240 try {
0 commit comments