Skip to content

Commit 8fdd23c

Browse files
committed
fix: issues with job scheduler
1 parent bf6fbec commit 8fdd23c

File tree

8 files changed

+547
-167
lines changed

8 files changed

+547
-167
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ https:
2626
model.onnx
2727
test_scripts/*
2828
run-master.sh
29-
run-threat-consumer.sh
29+
run-threat-consumer.sh
30+
run-account-jobs.sh

apps/account-job-executor/src/main/java/com/akto/account_job_executor/client/CyborgApiClient.java

Lines changed: 76 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
package com.akto.account_job_executor.client;
22

3-
import com.akto.dto.OriginalHttpRequest;
4-
import com.akto.dto.OriginalHttpResponse;
53
import com.akto.dto.jobs.AccountJob;
64
import com.akto.dto.jobs.JobStatus;
75
import com.akto.dto.jobs.ScheduleType;
86
import com.akto.log.LoggerMaker;
9-
import com.akto.testing.ApiExecutor;
107
import com.fasterxml.jackson.databind.DeserializationFeature;
118
import com.fasterxml.jackson.databind.ObjectMapper;
129
import com.mongodb.BasicDBObject;
10+
import org.apache.http.client.methods.CloseableHttpResponse;
11+
import org.apache.http.client.methods.HttpPost;
12+
import org.apache.http.entity.StringEntity;
13+
import org.apache.http.impl.client.CloseableHttpClient;
14+
import org.apache.http.impl.client.HttpClients;
15+
import org.apache.http.util.EntityUtils;
1316
import org.bson.types.ObjectId;
1417

15-
import java.util.Collections;
18+
import java.io.IOException;
1619
import java.util.HashMap;
17-
import java.util.List;
1820
import java.util.Map;
1921

2022
/**
@@ -39,21 +41,51 @@ private static String buildCyborgUrl() {
3941
if (dbAbsHost.endsWith("/")) {
4042
dbAbsHost = dbAbsHost.substring(0, dbAbsHost.length() - 1);
4143
}
42-
logger.info("Cyborg URL configured: {}", dbAbsHost + "/api");
44+
// Don't use logger during static initialization - can cause NullPointerException
45+
System.out.println("[CyborgApiClient] Cyborg URL configured: " + dbAbsHost + "/api");
4346
return dbAbsHost + "/api";
4447
}
4548

4649
/**
47-
* Build HTTP headers with JWT authentication token.
50+
* Get JWT authentication token from environment.
4851
*/
49-
private static Map<String, List<String>> buildHeaders() {
52+
private static String getAuthToken() {
5053
String token = System.getenv("DATABASE_ABSTRACTOR_SERVICE_TOKEN");
5154
if (token == null || token.isEmpty()) {
5255
throw new IllegalStateException("DATABASE_ABSTRACTOR_SERVICE_TOKEN environment variable not set");
5356
}
54-
Map<String, List<String>> headers = new HashMap<>();
55-
headers.put("Authorization", Collections.singletonList(token));
56-
return headers;
57+
return token;
58+
}
59+
60+
/**
61+
* Make HTTP POST request to Cyborg API using Apache HttpClient.
62+
* This bypasses ApiExecutor's SSRF protection since we're making trusted internal calls.
63+
*/
64+
private static String makePostRequest(String endpoint, Map<String, Object> requestBody) throws IOException {
65+
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
66+
HttpPost httpPost = new HttpPost(url + endpoint);
67+
68+
// Set headers
69+
httpPost.setHeader("Authorization", getAuthToken());
70+
httpPost.setHeader("Content-Type", "application/json");
71+
72+
// Set body
73+
String jsonBody = mapper.writeValueAsString(requestBody);
74+
httpPost.setEntity(new StringEntity(jsonBody));
75+
76+
// Execute request
77+
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
78+
int statusCode = response.getStatusLine().getStatusCode();
79+
String responseBody = EntityUtils.toString(response.getEntity());
80+
81+
if (statusCode != 200) {
82+
logger.error("API request failed. Status: {}, Body: {}", statusCode, responseBody);
83+
throw new IOException("API request failed with status: " + statusCode);
84+
}
85+
86+
return responseBody;
87+
}
88+
}
5789
}
5890

5991
/**
@@ -65,29 +97,16 @@ private static Map<String, List<String>> buildHeaders() {
6597
* @return Claimed AccountJob or null if no jobs available
6698
*/
6799
public static AccountJob fetchAndClaimJob(long now, int heartbeatThreshold) {
100+
logger.info("=== fetchAndClaimJob called: now={}, threshold={} ===", now, heartbeatThreshold);
68101
try {
69-
Map<String, List<String>> headers = buildHeaders();
70-
BasicDBObject requestBody = new BasicDBObject();
102+
Map<String, Object> requestBody = new HashMap<>();
71103
requestBody.put("now", now);
72104
requestBody.put("heartbeatThreshold", heartbeatThreshold);
73105

74-
OriginalHttpRequest request = new OriginalHttpRequest(
75-
url + "/fetchAndClaimAccountJob",
76-
"", "POST",
77-
requestBody.toString(),
78-
headers, ""
79-
);
80-
81-
logger.debug("Fetching and claiming job from Cyborg API");
82-
OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null);
106+
logger.info("Fetching and claiming job from Cyborg API: now={}, threshold={}", now, heartbeatThreshold);
107+
String body = makePostRequest("/fetchAndClaimAccountJob", requestBody);
83108

84-
if (response.getStatusCode() != 200) {
85-
logger.error("Failed to fetch job. Status: {}, Body: {}",
86-
response.getStatusCode(), response.getBody());
87-
return null;
88-
}
89-
90-
String body = response.getBody();
109+
logger.info("API response received, body length: {}", body != null ? body.length() : 0);
91110
if (body == null || body.isEmpty() || body.equals("null") || body.equals("{}")) {
92111
logger.debug("No jobs available to claim");
93112
return null;
@@ -107,7 +126,8 @@ public static AccountJob fetchAndClaimJob(long now, int heartbeatThreshold) {
107126
return job;
108127

109128
} catch (Exception e) {
110-
logger.error("Error fetching and claiming job", e);
129+
logger.error("!!! EXCEPTION in fetchAndClaimJob: {}", e.getMessage(), e);
130+
e.printStackTrace();
111131
return null;
112132
}
113133
}
@@ -123,31 +143,17 @@ public static AccountJob fetchAndClaimJob(long now, int heartbeatThreshold) {
123143
*/
124144
public static void updateJobStatus(ObjectId id, JobStatus status, Integer finishedAt, String error) {
125145
try {
126-
Map<String, List<String>> headers = buildHeaders();
127-
BasicDBObject requestBody = new BasicDBObject();
146+
Map<String, Object> requestBody = new HashMap<>();
128147
requestBody.put("jobId", id.toString());
129148
requestBody.put("status", status.name());
130149
requestBody.put("finishedAt", finishedAt);
131150
if (error != null) {
132151
requestBody.put("error", error);
133152
}
134153

135-
OriginalHttpRequest request = new OriginalHttpRequest(
136-
url + "/updateAccountJobStatus",
137-
"", "POST",
138-
requestBody.toString(),
139-
headers, ""
140-
);
141-
142154
logger.debug("Updating job status: jobId={}, status={}", id, status);
143-
OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null);
144-
145-
if (response.getStatusCode() != 200) {
146-
logger.error("Failed to update job status. Status: {}, Body: {}",
147-
response.getStatusCode(), response.getBody());
148-
} else {
149-
logger.debug("Successfully updated job status: jobId={}, status={}", id, status);
150-
}
155+
makePostRequest("/updateAccountJobStatus", requestBody);
156+
logger.debug("Successfully updated job status: jobId={}, status={}", id, status);
151157

152158
} catch (Exception e) {
153159
logger.error("Error updating job status", e);
@@ -162,26 +168,12 @@ public static void updateJobStatus(ObjectId id, JobStatus status, Integer finish
162168
*/
163169
public static void updateJobHeartbeat(ObjectId id) {
164170
try {
165-
Map<String, List<String>> headers = buildHeaders();
166-
BasicDBObject requestBody = new BasicDBObject();
171+
Map<String, Object> requestBody = new HashMap<>();
167172
requestBody.put("jobId", id.toString());
168173

169-
OriginalHttpRequest request = new OriginalHttpRequest(
170-
url + "/updateAccountJobHeartbeat",
171-
"", "POST",
172-
requestBody.toString(),
173-
headers, ""
174-
);
175-
176174
logger.debug("Updating job heartbeat: jobId={}", id);
177-
OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null);
178-
179-
if (response.getStatusCode() != 200) {
180-
logger.error("Failed to update job heartbeat. Status: {}, Body: {}",
181-
response.getStatusCode(), response.getBody());
182-
} else {
183-
logger.debug("Successfully updated job heartbeat: jobId={}", id);
184-
}
175+
makePostRequest("/updateAccountJobHeartbeat", requestBody);
176+
logger.debug("Successfully updated job heartbeat: jobId={}", id);
185177

186178
} catch (Exception e) {
187179
logger.error("Error updating job heartbeat", e);
@@ -197,27 +189,13 @@ public static void updateJobHeartbeat(ObjectId id) {
197189
*/
198190
public static void updateJob(ObjectId id, Map<String, Object> updates) {
199191
try {
200-
Map<String, List<String>> headers = buildHeaders();
201-
BasicDBObject requestBody = new BasicDBObject();
192+
Map<String, Object> requestBody = new HashMap<>();
202193
requestBody.put("jobId", id.toString());
203194
requestBody.put("updates", updates);
204195

205-
OriginalHttpRequest request = new OriginalHttpRequest(
206-
url + "/updateAccountJob",
207-
"", "POST",
208-
requestBody.toString(),
209-
headers, ""
210-
);
211-
212196
logger.debug("Updating job: jobId={}, updates={}", id, updates);
213-
OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null);
214-
215-
if (response.getStatusCode() != 200) {
216-
logger.error("Failed to update job. Status: {}, Body: {}",
217-
response.getStatusCode(), response.getBody());
218-
} else {
219-
logger.debug("Successfully updated job: jobId={}", id);
220-
}
197+
makePostRequest("/updateAccountJob", requestBody);
198+
logger.debug("Successfully updated job: jobId={}", id);
221199

222200
} catch (Exception e) {
223201
logger.error("Error updating job", e);
@@ -232,27 +210,11 @@ public static void updateJob(ObjectId id, Map<String, Object> updates) {
232210
*/
233211
public static AccountJob findById(ObjectId id) {
234212
try {
235-
Map<String, List<String>> headers = buildHeaders();
236-
BasicDBObject requestBody = new BasicDBObject();
213+
Map<String, Object> requestBody = new HashMap<>();
237214
requestBody.put("jobId", id.toString());
238215

239-
OriginalHttpRequest request = new OriginalHttpRequest(
240-
url + "/fetchAccountJob",
241-
"", "POST",
242-
requestBody.toString(),
243-
headers, ""
244-
);
245-
246216
logger.debug("Fetching job by ID: jobId={}", id);
247-
OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null);
248-
249-
if (response.getStatusCode() != 200) {
250-
logger.error("Failed to fetch job. Status: {}, Body: {}",
251-
response.getStatusCode(), response.getBody());
252-
return null;
253-
}
254-
255-
String body = response.getBody();
217+
String body = makePostRequest("/fetchAccountJob", requestBody);
256218
if (body == null || body.isEmpty() || body.equals("null") || body.equals("{}")) {
257219
logger.debug("Job not found: jobId={}", id);
258220
return null;
@@ -286,14 +248,25 @@ public static AccountJob findById(ObjectId id) {
286248
private static AccountJob convertMapToAccountJob(Map<String, Object> map) {
287249
AccountJob job = new AccountJob();
288250

289-
// Handle ObjectId
290-
if (map.get("_id") != null) {
291-
Object idObj = map.get("_id");
251+
// Handle ObjectId - check both "_id" and "id" fields
252+
Object idObj = map.get("_id");
253+
if (idObj == null) {
254+
idObj = map.get("id"); // Try "id" without underscore
255+
}
256+
257+
if (idObj != null) {
292258
if (idObj instanceof String) {
293259
job.setId(new ObjectId((String) idObj));
294260
} else if (idObj instanceof Map) {
295261
Map<String, Object> idMap = (Map<String, Object>) idObj;
296-
job.setId(new ObjectId((String) idMap.get("$oid")));
262+
// Handle different ObjectId formats: {"$oid": "..."} or {"timestamp": ..., "date": ...}
263+
if (idMap.containsKey("$oid")) {
264+
job.setId(new ObjectId((String) idMap.get("$oid")));
265+
} else if (idMap.containsKey("timestamp")) {
266+
// Create ObjectId from timestamp
267+
int timestamp = getIntValue(idMap, "timestamp");
268+
job.setId(new ObjectId(timestamp, 0));
269+
}
297270
}
298271
}
299272

apps/account-job-executor/src/main/java/com/akto/account_job_executor/cron/AccountJobsCron.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class AccountJobsCron {
2121

2222
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
2323
private static final ExecutorService executorService = Executors.newFixedThreadPool(2);
24-
private static final int MAX_HEARTBEAT_THRESHOLD_SECONDS = 300;
24+
private static final int MAX_HEARTBEAT_THRESHOLD_SECONDS = 5;
2525

2626
private static final LoggerMaker logger = new LoggerMaker(AccountJobsCron.class);
2727

@@ -39,16 +39,19 @@ public void startScheduler() {
3939
scheduler.scheduleAtFixedRate(() -> {
4040
try {
4141
long now = Context.now();
42+
logger.info("Polling for jobs: now={}, threshold={}", now, MAX_HEARTBEAT_THRESHOLD_SECONDS);
4243

4344
// Fetch and claim one job atomically via Cyborg API
4445
// This returns either:
4546
// 1. A SCHEDULED job where scheduledAt < now
4647
// 2. A RUNNING job where heartbeatAt < now - 300s (stale job)
4748
// 3. null if no jobs available
49+
logger.info("About to call CyborgApiClient.fetchAndClaimJob()...");
4850
AccountJob job = CyborgApiClient.fetchAndClaimJob(now, MAX_HEARTBEAT_THRESHOLD_SECONDS);
51+
logger.info("CyborgApiClient.fetchAndClaimJob() returned: {}", job != null ? "job found" : "null");
4952

5053
if (job == null) {
51-
logger.debug("No jobs to run");
54+
logger.info("No jobs to run at this time");
5255
return;
5356
}
5457

@@ -76,7 +79,13 @@ public void startScheduler() {
7679
});
7780

7881
} catch (Exception e) {
79-
logger.error("Error in AccountJobs scheduler", e);
82+
logger.error("!!! EXCEPTION in AccountJobs scheduler !!!", e);
83+
e.printStackTrace();
84+
System.err.println("EXCEPTION: " + e.getMessage());
85+
} catch (Throwable t) {
86+
logger.error("!!! THROWABLE in AccountJobs scheduler !!!", t);
87+
t.printStackTrace();
88+
System.err.println("THROWABLE: " + t.getMessage());
8089
}
8190
}, 0, 5, TimeUnit.SECONDS);
8291

0 commit comments

Comments
 (0)