Skip to content

Commit e7b1f0d

Browse files
authored
Merge pull request #3777 from akto-api-security/feature/logs_async
Feature/logs async
2 parents 61c6703 + 69ba298 commit e7b1f0d

File tree

3 files changed

+52
-62
lines changed

3 files changed

+52
-62
lines changed

.cursor/rules/akto-core-rules.mdc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
alwaysApply: true
3+
---
4+
5+
- DO NOT REPEAT CODE, EVER
6+
- IF ANY CODE/FUNCTION/BLOCK CAN BE REUSED, REUSE IT.
7+
- IF ANY PIECE OF CODE HAS BEEN CREATED BY YOU, BUT NOT BEING USED, DELETE IT.

apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ private static void bulkParseTrafficToResponseParams(long lastSyncOffset, Consum
577577
printL(r.value());
578578
if(LOG_DEBUG_RECORDS > 0){
579579
LOG_DEBUG_RECORDS--;
580-
loggerMaker.infoAndAddToDb("Kafka record recieved" + r.value());
580+
loggerMaker.infoAndAddToDb("Kafka record received" + r.value());
581581
}
582582
AllMetrics.instance.setRuntimeKafkaRecordCount(1);
583583
AllMetrics.instance.setRuntimeKafkaRecordSize(r.value().length());
@@ -601,7 +601,7 @@ private static void bulkParseTrafficToResponseParams(long lastSyncOffset, Consum
601601

602602
httpResponseParams = HttpCallParser.parseKafkaMessage(r.value());
603603
if (httpResponseParams == null) {
604-
loggerMaker.error("httpresponse params was skipped due to invalid json requestBody");
604+
loggerMaker.error("HttpResponse params was skipped due to invalid json requestBody");
605605
continue;
606606
}
607607

@@ -759,8 +759,6 @@ private static void sendToCentralKafka(String centralKafkaTopicName, List<HttpRe
759759
}
760760
}
761761

762-
763-
764762
private static AccountInfo refreshAccountInfo(Map<Integer, AccountInfo> accountInfoMap, int accountIdInt) {
765763
AccountInfo accountInfo = accountInfoMap.computeIfAbsent(accountIdInt, k -> new AccountInfo());
766764

@@ -817,8 +815,8 @@ public static List<HttpResponseParams> filterBasedOnHeaders(List<HttpResponsePar
817815
accWiseResponse = accWiseResponseFiltered;
818816
}
819817

820-
Map<String, Map<Pattern, String>> apiCollectioNameMapper = accountSettings.convertApiCollectionNameMapperToRegex();
821-
changeTargetCollection(apiCollectioNameMapper, accWiseResponse);
818+
Map<String, Map<Pattern, String>> apiCollectionNameMapper = accountSettings.convertApiCollectionNameMapperToRegex();
819+
changeTargetCollection(apiCollectionNameMapper, accWiseResponse);
822820
}
823821

824822
return accWiseResponse;

libs/utils/src/main/java/com/akto/data_actor/ClientActor.java

Lines changed: 41 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,20 @@ public class ClientActor extends DataActor {
9898
private static final CodecRegistry codecRegistry = DaoInit.createCodecRegistry();
9999
public static final String CYBORG_URL = "https://cyborg.akto.io";
100100
private static ExecutorService threadPool = Executors.newFixedThreadPool(maxConcurrentBatchWrites);
101+
private static final int maxConcurrentLogWrites = getMaxConcurrentLogWrites();
102+
final private static ExecutorService logThreadPool = Executors.newFixedThreadPool(maxConcurrentLogWrites);
103+
104+
private static int getMaxConcurrentLogWrites() {
105+
String envValue = System.getenv("MAX_CONCURRENT_LOG_WRITES");
106+
if (envValue != null) {
107+
try {
108+
return Integer.parseInt(envValue);
109+
} catch (NumberFormatException e) {
110+
// fall through to default
111+
}
112+
}
113+
return 50;
114+
}
101115

102116
/**
103117
* Dedicated thread pool for agent traffic log HTTP writes.
@@ -1257,44 +1271,37 @@ public void createCollectionSimpleForVpc(int vxlanId, String vpcId, List<Collect
12571271
}
12581272
}
12591273

1260-
public void insertRuntimeLog(Log log) {
1261-
Map<String, List<String>> headers = buildHeaders();
1262-
BasicDBObject obj = new BasicDBObject();
1263-
BasicDBObject logObj = new BasicDBObject();
1264-
logObj.put("key", log.getKey());
1265-
logObj.put("log", log.getLog());
1266-
logObj.put("timestamp", log.getTimestamp());
1267-
obj.put("log", logObj);
1268-
OriginalHttpRequest request = new OriginalHttpRequest(url + "/insertRuntimeLog", "", "POST", obj.toString(), headers, "");
1269-
try {
1270-
OriginalHttpResponse response = ApiExecutor.sendRequestBackOff(request, true, null, false, null);
1271-
String responsePayload = response.getBody();
1272-
if (response.getStatusCode() != 200 || responsePayload == null) {
1273-
loggerMaker.info("non 2xx response in insertRuntimeLog");
1274-
return;
1274+
private void insertLogAsync(Log log, String endpoint) {
1275+
logThreadPool.submit(() -> {
1276+
try {
1277+
Map<String, List<String>> headers = buildHeaders();
1278+
BasicDBObject obj = new BasicDBObject();
1279+
BasicDBObject logObj = new BasicDBObject();
1280+
logObj.put("key", log.getKey());
1281+
logObj.put("log", log.getLog());
1282+
logObj.put("timestamp", log.getTimestamp());
1283+
obj.put("log", logObj);
1284+
OriginalHttpRequest request = new OriginalHttpRequest(url + endpoint, "", "POST", obj.toString(), headers, "");
1285+
OriginalHttpResponse response = ApiExecutor.sendRequestBackOff(request, true, null, false, null);
1286+
if (response.getStatusCode() != 200 || response.getBody() == null) {
1287+
loggerMaker.info("non 2xx response in " + endpoint);
1288+
}
1289+
} catch (Exception e) {
1290+
loggerMaker.error("error in " + endpoint + ": " + e);
12751291
}
1276-
} catch (Exception e) {
1277-
loggerMaker.error("error in insertRuntimeLog" + e);
1278-
return;
1279-
}
1292+
});
1293+
}
1294+
1295+
public void insertRuntimeLog(Log log) {
1296+
insertLogAsync(log, "/insertRuntimeLog");
12801297
}
12811298

12821299
public void insertAnalyserLog(Log log) {
1283-
Map<String, List<String>> headers = buildHeaders();
1284-
BasicDBObject obj = new BasicDBObject();
1285-
obj.put("log", log);
1286-
OriginalHttpRequest request = new OriginalHttpRequest(url + "/insertAnalyserLog", "", "POST", obj.toString(), headers, "");
1287-
try {
1288-
OriginalHttpResponse response = ApiExecutor.sendRequestBackOff(request, true, null, false, null);
1289-
String responsePayload = response.getBody();
1290-
if (response.getStatusCode() != 200 || responsePayload == null) {
1291-
loggerMaker.errorAndAddToDb("non 2xx response in insertAnalyserLog", LoggerMaker.LogDb.RUNTIME);
1292-
return;
1293-
}
1294-
} catch (Exception e) {
1295-
loggerMaker.errorAndAddToDb("error in insertAnalyserLog" + e, LoggerMaker.LogDb.RUNTIME);
1296-
return;
1297-
}
1300+
insertLogAsync(log, "/insertAnalyserLog");
1301+
}
1302+
1303+
public void insertTestingLog(Log log) {
1304+
insertLogAsync(log, "/insertTestingLog");
12981305
}
12991306

13001307
public void modifyHybridSaasSetting(boolean isHybridSaas) {
@@ -3313,28 +3320,6 @@ public void modifyHybridTestingSetting(boolean hybridTestingEnabled) {
33133320
}
33143321
}
33153322

3316-
public void insertTestingLog(Log log) {
3317-
Map<String, List<String>> headers = buildHeaders();
3318-
BasicDBObject obj = new BasicDBObject();
3319-
BasicDBObject logObj = new BasicDBObject();
3320-
logObj.put("key", log.getKey());
3321-
logObj.put("log", log.getLog());
3322-
logObj.put("timestamp", log.getTimestamp());
3323-
obj.put("log", logObj);
3324-
OriginalHttpRequest request = new OriginalHttpRequest(url + "/insertTestingLog", "", "POST", obj.toString(), headers, "");
3325-
try {
3326-
OriginalHttpResponse response = ApiExecutor.sendRequestBackOff(request, true, null, false, null);
3327-
String responsePayload = response.getBody();
3328-
if (response.getStatusCode() != 200 || responsePayload == null) {
3329-
loggerMaker.info("non 2xx response in insertTestingLog");
3330-
return;
3331-
}
3332-
} catch (Exception e) {
3333-
loggerMaker.error("error in insertTestingLog" + e);
3334-
return;
3335-
}
3336-
}
3337-
33383323
public void bulkWriteDependencyNodes(List<DependencyNode> dependencyNodeList) {
33393324
BasicDBObject obj = new BasicDBObject();
33403325
obj.put("dependencyNodeList", dependencyNodeList);

0 commit comments

Comments
 (0)