Skip to content

Commit 5dbbe69

Browse files
final fix of worker
1 parent 13fbe08 commit 5dbbe69

File tree

4 files changed

+144
-94
lines changed

4 files changed

+144
-94
lines changed

example/src/index.js

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ const main = async () => {
1515
language: "cpp",
1616
timeout: "2",
1717
sizeout: "64",
18-
});
19-
console.log("this is ans ",result);
18+
});
19+
console.log("this is ans ", result);
2020
};
2121

2222
main();
2323

24-
function getPath(){
24+
function getPath() {
2525
const code = `
2626
#include<iostream>
2727
#include<vector>
@@ -49,11 +49,11 @@ function getPath(){
4949
return tmpPath;
5050
}
5151

52-
// creating ~200 test case
52+
// creating ~200 test case
5353
function createTestcase() {
5454
const inputs = [];
5555
const outputs = [];
56-
for (let i = 1; i <= 15; i++) {
56+
for (let i = 1; i <= 305; i++) {
5757
const arr = Array.from({ length: i }, (_, idx) => idx + 1).join(" ");
5858
inputs.push(`${i} ${arr}`);
5959
const sum = (i * (i + 1)) / 2;
@@ -63,5 +63,3 @@ function createTestcase() {
6363
const outputStr = outputs.join(" ### ");
6464
return { inputStr, outputStr };
6565
}
66-
67-

judge-image-for-worker/workers/index.js

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ app.use(express.json());
3333

3434
const __filename = fileURLToPath(import.meta.url);
3535
const __dirname = path.dirname(__filename);
36-
const WORKER_FIELD = process.env.WORKER_FIELD;
3736
const port = process.env.PORT;
3837
const redis_server = await connectredis();
3938

@@ -145,7 +144,7 @@ function runTestcase(
145144
async function processJob(ques_name, code, language, testcases) {
146145
const extension =
147146
language === "cpp" ? "cpp" : language === "java" ? "java" : "py";
148-
const fileName = `${ques_name}_${WORKER_FIELD}.${extension}`;
147+
const fileName = `${ques_name}.${extension}`;
149148
const filePath = path.join(__dirname, fileName);
150149
const execPath =
151150
language === "java"
@@ -170,24 +169,25 @@ async function processJob(ques_name, code, language, testcases) {
170169
)
171170
);
172171
console.log(results, "this is results");
172+
// Store job result
173173
await redis_server.setEx(
174-
`job:${ques_name}:worker:${WORKER_FIELD}`,
174+
`job:${ques_name}:result`,
175175
30,
176176
JSON.stringify(results)
177177
);
178178
await redis_server.hSet(`job:${ques_name}:status`, {
179-
[WORKER_FIELD]: "completed",
179+
state: "completed",
180180
});
181181
await redis_server.expire(`job:${ques_name}:status`, 30);
182182
} catch (err) {
183183
console.error("Error during job processing:", err);
184184
await redis_server.setEx(
185-
`job:${ques_name}:worker:${WORKER_FIELD}`,
185+
`job:${ques_name}:result`,
186186
30,
187187
JSON.stringify([{ error: err.toString() }])
188188
);
189189
await redis_server.hSet(`job:${ques_name}:status`, {
190-
[WORKER_FIELD]: "completed",
190+
state: "failed",
191191
});
192192
} finally {
193193
try {
@@ -204,17 +204,44 @@ async function processJob(ques_name, code, language, testcases) {
204204
async function pollForJobs() {
205205
while (true) {
206206
try {
207-
const { element: ques_name } = await redis_server.brPop("job_queue", 0);
208-
console.log(`Got job: ${ques_name}`);
207+
// Fetch a job name
208+
const result = await redis_server.brPop("job_queue", 0);
209209

210+
if (!result) {
211+
console.warn("[Worker] BRPOP returned empty");
212+
continue;
213+
}
214+
215+
const ques_name = result.element;
216+
console.log(`[Worker] Got job: ${ques_name}`);
217+
218+
// Fetch code & language (from Redis Hash or DB)
210219
const code = await redis_server.hGet(ques_name, "code");
211220
const language = await redis_server.hGet(ques_name, "language");
212-
const data_testcases = await redis_server.hGet(ques_name, WORKER_FIELD);
213-
if (!data_testcases) continue;
214-
const testcases = JSON.parse(data_testcases);
221+
222+
// Get next batch of test cases
223+
const batchData = await redis_server.lPop(`testcase_queue:${ques_name}`);
224+
if (!batchData) {
225+
console.warn(`[Worker] No batch found for ${ques_name}`);
226+
continue;
227+
}
228+
229+
const testcases = JSON.parse(batchData);
230+
231+
// Process the job
215232
await processJob(ques_name, code, language, testcases);
233+
234+
// Update status
235+
await redis_server.hIncrBy(
236+
`job:${ques_name}:status`,
237+
"completedBatches",
238+
1
239+
);
240+
console.log(
241+
`[Worker] Completed one batch of ${testcases.length} for ${ques_name}`
242+
);
216243
} catch (err) {
217-
console.error("Error while polling job:", err);
244+
console.error("[Worker] Error while polling job:", err);
218245
}
219246
}
220247
}
@@ -226,5 +253,5 @@ app.get("/ping", (req, res) => {
226253
});
227254

228255
app.listen(port, () => {
229-
console.log(`${WORKER_FIELD} running at port ${port}`);
256+
console.log(`running at port ${port}`);
230257
});

judge-image-for-worker/workers/run.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
PORT=3001 WORKER_FIELD=worker_0 nodemon index.js &
2-
PORT=3002 WORKER_FIELD=worker_1 nodemon index.js &
3-
PORT=3003 WORKER_FIELD=worker_2 nodemon index.js &
1+
PORT=3001 nodemon index.js &
2+
PORT=3002 nodemon index.js &
3+
PORT=3003 nodemon index.js &
44

55

66

lib/src/run.js

Lines changed: 96 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,8 @@ import fs from "fs";
22
import "dotenv/config";
33
import { connectredis } from "./redis.js";
44

5-
const worker_running = [
6-
"https://workers-judge.onrender.com/",
7-
"https://workers-judge-1.onrender.com/",
8-
"https://workers-judge-2.onrender.com/",
9-
];
10-
115
const redis = await connectredis();
126

13-
/**
14-
* @param {string} codePath - Absolute path to uploaded code file
15-
* @param {string} ques_name - Unique job/question ID
16-
* @param {string} input - Raw input test cases (separated by ###)
17-
* @param {string} output - Raw expected outputs (separated by ###)
18-
* @param {string} timeout - Max timeout in seconds
19-
* @param {string} sizeout - Max output size in KB
20-
*/
217
export async function judge({
228
codePath,
239
language,
@@ -27,79 +13,108 @@ export async function judge({
2713
timeout,
2814
sizeout,
2915
}) {
30-
if (!codePath || !ques_name || !timeout || !sizeout || !language)
31-
throw new Error("codePath and ques_name are required");
16+
if (!codePath || !ques_name || !timeout || !sizeout || !language) {
17+
throw new Error("codePath, ques_name, timeout, sizeout, and language are required");
18+
}
3219

3320
let code;
3421
try {
22+
// 1. Read submitted code
3523
code = fs.readFileSync(codePath, "utf-8");
3624

37-
if (
38-
code.includes("fopen") ||
39-
code.includes("system") ||
40-
code.includes("fork")
41-
) {
42-
throw new Error("Potentially dangerous code detected.");
25+
// 2. Validate code for dangerous functions
26+
const dangerous = ["fopen", "system", "fork", "exec", "eval", "import os", "subprocess"];
27+
for (const keyword of dangerous) {
28+
if (code.includes(keyword)) {
29+
throw new Error(`Potentially dangerous code detected: ${keyword}`);
30+
}
4331
}
4432

45-
const inputParts = input
46-
.split("###")
47-
.map((s) => s.trim())
48-
.filter(Boolean);
49-
const outputParts = output
50-
.split("###")
51-
.map((s) => s.trim())
52-
.filter(Boolean);
53-
54-
const testcases = inputParts.map((input, i) => ({
55-
input,
33+
// 3. Prepare test cases
34+
const inputParts = input.split("###").map(s => s.trim()).filter(Boolean);
35+
const outputParts = output.split("###").map(s => s.trim()).filter(Boolean);
36+
37+
if (inputParts.length !== outputParts.length) {
38+
throw new Error(
39+
`Mismatch between input (${inputParts.length}) and output (${outputParts.length})`
40+
);
41+
}
42+
43+
const testcases = inputParts.map((inp, i) => ({
44+
input: inp,
5645
expected_output: outputParts[i],
5746
correct: null,
58-
timeout:timeout || 2.5,
47+
timeout: timeout || 2.5,
5948
sizeout,
6049
result: null,
6150
}));
6251

63-
const workerCount = worker_running.length;
64-
const workerTaskMap = {};
65-
66-
if (testcases.length < 3) {
67-
workerTaskMap['worker_0'] = testcases;
68-
} else {
69-
testcases.forEach((tc, i) => {
70-
const workerId = `worker_${i % workerCount}`;
71-
if (!workerTaskMap[workerId]) workerTaskMap[workerId] = [];
72-
workerTaskMap[workerId].push(tc);
73-
});
74-
}
75-
76-
const redisPayload = {
52+
// 4. Clean old data for this job
53+
const pipeline = redis.multi();
54+
pipeline.del(`testcase_queue:${ques_name}`);
55+
pipeline.del(`results_queue:${ques_name}`);
56+
pipeline.del(`job:${ques_name}:status`);
57+
pipeline.del(ques_name);
58+
pipeline.del(`job:${ques_name}:result`);
59+
await pipeline.exec();
60+
61+
// 5. Save code + metadata
62+
await redis.hSet(ques_name, {
7763
code,
7864
language,
79-
...Object.fromEntries(
80-
Object.entries(workerTaskMap).map(([k, v]) => [k, JSON.stringify(v)])
81-
),
82-
};
83-
84-
await redis.hSet(ques_name, redisPayload);
85-
const assignedWorkers = Object.keys(workerTaskMap);
65+
totalTestCases: testcases.length.toString(),
66+
timeout: timeout.toString(),
67+
sizeout: sizeout.toString(),
68+
createdTime: Date.now().toString(),
69+
});
70+
71+
// 6. Split testcases into batches of 100
72+
const BATCH_SIZE = 100;
73+
const batches = [];
74+
for (let i = 0; i < testcases.length; i += BATCH_SIZE) {
75+
batches.push(testcases.slice(i, i + BATCH_SIZE));
76+
}
8677

87-
await Promise.all(
88-
assignedWorkers.map(() => redis.lPush("job_queue", ques_name))
78+
console.log(
79+
`Splitting into ${batches.length} batches of up to ${BATCH_SIZE} test cases each`
8980
);
9081

82+
// 7. Push each batch to Redis queue
83+
for (const batch of batches) {
84+
await redis.rPush(`testcase_queue:${ques_name}`, JSON.stringify(batch));
85+
await redis.lPush("job_queue", ques_name);
86+
}
87+
88+
// 8. Initialize job status
89+
await redis.hSet(`job:${ques_name}:status`, {
90+
status: "queued",
91+
totalBatches: batches.length.toString(),
92+
completedBatches: "0",
93+
createdTime: Date.now().toString(),
94+
});
95+
96+
console.log(`[DEBUG] Job ${ques_name} queued with ${batches.length} batches`);
97+
98+
// 9. Wait for job completion
9199
const waitUntilCompleted = async () => {
92-
const POLL_INTERVAL = 500;
93-
const MAX_ATTEMPTS = 60;
100+
const POLL_INTERVAL = 500; // ms
101+
const MAX_ATTEMPTS = 60; // 30 seconds
94102
let attempts = 0;
95103

96104
while (attempts < MAX_ATTEMPTS) {
97105
const status = await redis.hGetAll(`job:${ques_name}:status`);
98-
const completed = Object.keys(status).filter(
99-
(k) => status[k] === "completed"
106+
107+
const totalBatches = parseInt(status.totalBatches || "0", 10);
108+
const completedBatches = parseInt(status.completedBatches || "0", 10);
109+
110+
console.log(
111+
`[DEBUG] Polling... Completed ${completedBatches}/${totalBatches} batches`
100112
);
101-
if (assignedWorkers.every((worker) => completed.includes(worker)))
113+
114+
if (completedBatches === totalBatches && totalBatches > 0) {
102115
return true;
116+
}
117+
103118
await new Promise((res) => setTimeout(res, POLL_INTERVAL));
104119
attempts++;
105120
}
@@ -108,26 +123,36 @@ export async function judge({
108123
};
109124

110125
const completed = await waitUntilCompleted();
111-
112126
if (!completed) {
113127
throw new Error("Timeout waiting for workers to finish");
114128
}
115129

116-
const results = [];
117-
for (const workerId of assignedWorkers) {
118-
const data = await redis.get(`job:${ques_name}:worker:${workerId}`);
119-
if (data) results.push(...JSON.parse(data));
130+
const finalData = await redis.get(`job:${ques_name}:result`);
131+
let results = [];
132+
if (finalData) {
133+
try {
134+
results = JSON.parse(finalData);
135+
} catch (err) {
136+
console.error("Error parsing job result JSON:", err);
137+
}
138+
} else {
139+
console.warn(`[WARN] No results found for job ${ques_name}`);
120140
}
121-
122141
return {
142+
status: "completed",
123143
jobId: ques_name,
144+
totalTestCases: testcases.length,
145+
totalBatches: batches.length,
124146
results,
125147
};
148+
126149
} catch (err) {
127-
console.log(err);
150+
console.error("Judge error:", err.message);
151+
throw err;
128152
} finally {
129-
if (fs.existsSync(codePath)) {
153+
if (codePath && fs.existsSync(codePath)) {
130154
fs.unlinkSync(codePath);
155+
console.log(`Cleaned up uploaded code file: ${codePath}`);
131156
}
132157
}
133-
}
158+
}

0 commit comments

Comments
 (0)