Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ https:
model.onnx
test_scripts/*
run-master.sh
run-threat-consumer.sh
run-threat-consumer.sh
run-account-jobs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.akto.dao.context.Context;
import com.akto.dao.jobs.AccountJobDao;
import com.akto.dto.jobs.AccountJob;
import com.akto.dto.jobs.JobStatus;
import com.akto.dto.jobs.ScheduleType;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
import com.opensymphony.xwork2.Action;
Expand Down Expand Up @@ -72,19 +74,28 @@ public String initiateImport() {
// Convert Map<String, String> config to Map<String, Object> for generic storage
Map<String, Object> jobConfig = new HashMap<>(config);

int now = Context.now();
AccountJob accountJob = new AccountJob(
Context.accountId.get(), // accountId
"AI_AGENT_CONNECTOR", // jobType (generic)
connectorType, // subType (N8N, LANGCHAIN, COPILOT_STUDIO)
jobConfig, // flexible config map
interval, // recurringIntervalSeconds
Context.now(), // createdAt
Context.now() // lastUpdatedAt
now, // createdAt
now // lastUpdatedAt
);

// Set execution tracking fields for job scheduler
accountJob.setJobStatus(JobStatus.SCHEDULED);
accountJob.setScheduleType(ScheduleType.RECURRING);
accountJob.setScheduledAt(now); // Schedule immediately
accountJob.setHeartbeatAt(0);
accountJob.setStartedAt(0);
accountJob.setFinishedAt(0);

AccountJobDao.instance.insertOne(accountJob);
this.jobId = accountJob.getId().toHexString();
loggerMaker.info("Successfully created account-level job for " + connectorType + " connector with job ID: " + this.jobId + ", interval: " + interval + "s", LogDb.DASHBOARD);
loggerMaker.info("Successfully created account-level job for " + connectorType + " connector with job ID: " + this.jobId + ", interval: " + interval + "s, status: SCHEDULED", LogDb.DASHBOARD);

return Action.SUCCESS.toUpperCase();

Expand Down
9 changes: 5 additions & 4 deletions libs/dao/src/main/java/com/akto/dao/jobs/AccountJobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ public void createIndicesIfAbsent() {
String dbName = Context.accountId.get() + "";
createCollectionIfAbsent(dbName, getCollName(), new CreateCollectionOptions());

// Index 1: Filter by job type, sort by creation date
String[] fieldNames = {AccountJob.JOB_TYPE, AccountJob.CREATED_AT};
// Index 1: Critical for job polling - covers both SCHEDULED and stale RUNNING jobs
// Query: {jobStatus: {$in: [SCHEDULED, RUNNING]}, scheduledAt/heartbeatAt: {$lt: now}}
String[] fieldNames = {AccountJob.JOB_STATUS, AccountJob.SCHEDULED_AT, AccountJob.HEARTBEAT_AT};
MCollection.createIndexIfAbsent(getDBName(), getCollName(), fieldNames, false);

// Index 2: Filter by job type and sub-type, sort by creation date
fieldNames = new String[]{AccountJob.JOB_TYPE, AccountJob.SUB_TYPE, AccountJob.CREATED_AT};
// Index 2: For querying jobs by type and subtype (useful for dashboard/admin queries)
fieldNames = new String[]{AccountJob.JOB_TYPE, AccountJob.SUB_TYPE, AccountJob.JOB_STATUS};
MCollection.createIndexIfAbsent(getDBName(), getCollName(), fieldNames, false);
}
}
18 changes: 18 additions & 0 deletions libs/dao/src/main/java/com/akto/dto/jobs/AccountJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ public class AccountJob {
public static final String CREATED_AT = "createdAt";
public static final String LAST_UPDATED_AT = "lastUpdatedAt";

// Execution tracking field constants
public static final String JOB_STATUS = "jobStatus";
public static final String SCHEDULE_TYPE = "scheduleType";
public static final String SCHEDULED_AT = "scheduledAt";
public static final String STARTED_AT = "startedAt";
public static final String FINISHED_AT = "finishedAt";
public static final String HEARTBEAT_AT = "heartbeatAt";
public static final String ERROR = "error";

// Fields
private ObjectId id; // Primary key
private int accountId; // Account identifier
Expand All @@ -41,6 +50,15 @@ public class AccountJob {
private int createdAt; // Creation timestamp
private int lastUpdatedAt; // Last update timestamp

// Execution tracking fields
private JobStatus jobStatus; // Job status (SCHEDULED, RUNNING, COMPLETED, FAILED, STOPPED)
private ScheduleType scheduleType; // Schedule type (RECURRING, RUN_ONCE)
private int scheduledAt; // When the job should run (epoch seconds)
private int startedAt; // When job execution started (epoch seconds)
private int finishedAt; // When job execution finished (epoch seconds)
private int heartbeatAt; // Last heartbeat timestamp (epoch seconds)
private String error; // Error message if job failed

/**
* Constructor without id field (MongoDB will auto-generate the id).
* Use this constructor when creating new AccountJob instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ private AIAgentConnectorConstants() {
public static final String AZURE_CONTAINER_NAME = "binaries";

// Default Job Settings
public static final int DEFAULT_RECURRING_INTERVAL_SECONDS = 300; // 5 minutes
public static final int DEFAULT_RECURRING_INTERVAL_SECONDS = 5; // 5 seconds
}
Loading