Skip to content

Commit 3b850ed

Browse files
updated the hpa and docker code
1 parent ba4e82e commit 3b850ed

File tree

3 files changed

+74
-44
lines changed

3 files changed

+74
-44
lines changed

judge-image-for-worker/workers/index.js

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,24 +38,29 @@ const __filename = fileURLToPath(import.meta.url);
3838
const __dirname = path.dirname(__filename);
3939

4040
// Unique worker ID
41-
const WORKER_FIELD = process.env.WORKER_FIELD?.trim() || `worker_${os.hostname()}_${Math.floor(Date.now() / 1000)}`;
41+
const WORKER_FIELD =
42+
process.env.WORKER_FIELD?.trim() ||
43+
`worker_${os.hostname()}_${Math.floor(Date.now() / 1000)}`;
4244

4345
console.log("Worker ID:", WORKER_FIELD);
4446

4547
const port = 5000;
4648
const redis_server = await connectredis();
4749

48-
4950
// -----------------------------------
5051
// Code Compilation Logic
5152
// -----------------------------------
5253
async function compileCode(language, codePath, execPath) {
5354
return new Promise((resolve, reject) => {
5455
if (language === "cpp") {
55-
exec(`g++ "${codePath}" -o "${execPath}"`, { timeout: 10000 }, (err, _, stderr) => {
56-
if (err) return reject("C++ Compilation Error:\n" + stderr);
57-
resolve();
58-
});
56+
exec(
57+
`g++ "${codePath}" -o "${execPath}"`,
58+
{ timeout: 10000 },
59+
(err, _, stderr) => {
60+
if (err) return reject("C++ Compilation Error:\n" + stderr);
61+
resolve();
62+
}
63+
);
5964
} else if (language === "java") {
6065
exec(`javac "${codePath}"`, { timeout: 10000 }, (err, _, stderr) => {
6166
if (err) return reject("Java Compilation Error:\n" + stderr);
@@ -70,7 +75,14 @@ async function compileCode(language, codePath, execPath) {
7075
// -----------------------------------
7176
// Testcase Execution Logic
7277
// -----------------------------------
73-
function runTestcase(language, execPath, input, expected_output, timeoutSec, ques_name) {
78+
function runTestcase(
79+
language,
80+
execPath,
81+
input,
82+
expected_output,
83+
timeoutSec,
84+
ques_name
85+
) {
7486
return new Promise((resolve) => {
7587
const timeoutMs = timeoutSec * 1000;
7688
let run;
@@ -79,7 +91,10 @@ function runTestcase(language, execPath, input, expected_output, timeoutSec, que
7991
if (language === "cpp") {
8092
run = spawn(execPath, [], { stdio: ["pipe", "pipe", "pipe"] });
8193
} else if (language === "java") {
82-
run = spawn("java", ["Main"], { cwd: execPath, stdio: ["pipe", "pipe", "pipe"] });
94+
run = spawn("java", ["Main"], {
95+
cwd: execPath,
96+
stdio: ["pipe", "pipe", "pipe"],
97+
});
8398
} else if (language === "python" || language === "py") {
8499
const pythonCmd = process.platform === "win32" ? "python" : "python3";
85100
run = spawn(pythonCmd, [execPath], { stdio: ["pipe", "pipe", "pipe"] });
@@ -132,62 +147,63 @@ function runTestcase(language, execPath, input, expected_output, timeoutSec, que
132147
// Process a single job
133148
// -----------------------------------
134149
async function processJob(ques_name, code, language, testcases) {
135-
const extension = language === "cpp" ? "cpp" : language === "java" ? "java" : "py";
150+
const extension =
151+
language === "cpp" ? "cpp" : language === "java" ? "java" : "py";
136152
const fileName = `${ques_name}_${WORKER_FIELD}_${Date.now()}.${extension}`;
137153
const filePath = path.join(__dirname, fileName);
138-
139154
const execPath =
140-
language === "java" ? __dirname : filePath.replace(/\.\w+$/, language === "cpp" ? ".exe" : ".py");
155+
language === "java"
156+
? __dirname
157+
: filePath.replace(/\.\w+$/, language === "cpp" ? ".exe" : ".py");
141158

142-
// Write user code to file
143159
fs.writeFileSync(filePath, code);
144160

145161
try {
146-
// Compile code (if needed)
147162
await compileCode(language, filePath, execPath);
148163

149-
// Run all test cases in parallel
164+
// Run all testcases
150165
const results = await Promise.all(
151166
testcases.map((tc) =>
152-
runTestcase(language, execPath, tc.input, tc.expected_output, tc.timeout, ques_name)
167+
runTestcase(
168+
language,
169+
execPath,
170+
tc.input,
171+
tc.expected_output,
172+
tc.timeout,
173+
ques_name
174+
)
153175
)
154176
);
155177

156-
// Store results for this worker in Redis
157178
await redis_server.setEx(
158179
`job:${ques_name}:worker:${WORKER_FIELD}`,
159-
30,
180+
60,
160181
JSON.stringify(results)
161182
);
162-
163-
// Mark this worker as completed
183+
// ✅ Mark this worker as completed
184+
console.log("this is ans: ", results)
164185
await redis_server.hSet(`job:${ques_name}:status`, {
165186
[WORKER_FIELD]: "completed",
166187
});
167188
await redis_server.expire(`job:${ques_name}:status`, 30);
168-
169-
console.log(`Completed job ${ques_name} by ${WORKER_FIELD}`);
170189
} catch (err) {
171190
console.error("Error during job processing:", err);
172-
173-
// Store error for this worker
174191
await redis_server.setEx(
175192
`job:${ques_name}:worker:${WORKER_FIELD}`,
176-
30,
193+
60,
177194
JSON.stringify([{ error: err.toString() }])
178195
);
179-
180196
await redis_server.hSet(`job:${ques_name}:status`, {
181197
[WORKER_FIELD]: "completed",
182198
});
183199
} finally {
184-
// Cleanup files
185200
try {
186201
fs.unlinkSync(filePath);
187202
} catch {}
188203
try {
189204
if (language === "cpp") fs.unlinkSync(execPath);
190-
if (language === "java") fs.unlinkSync(filePath.replace(".java", ".class"));
205+
if (language === "java")
206+
fs.unlinkSync(filePath.replace(".java", ".class"));
191207
} catch {}
192208
}
193209
}
@@ -200,7 +216,7 @@ async function pollForJobs() {
200216
try {
201217
const { element: ques_name } = await redis_server.brPop("job_queue", 0);
202218
console.log(`Got job: ${ques_name} by ${WORKER_FIELD}`);
203-
console.log("Resolved Worker ID at startup:", WORKER_FIELD)
219+
console.log("Resolved Worker ID at startup:", WORKER_FIELD);
204220
// Fetch job details
205221
const code = await redis_server.hGet(ques_name, "code");
206222
const language = await redis_server.hGet(ques_name, "language");

lib/src/run.js

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ export async function judge({
3131
sizeout,
3232
}) {
3333
if (!codePath || !ques_name || !timeout || !sizeout || !language) {
34-
throw new Error("codePath, ques_name, timeout, sizeout, and language are required");
34+
throw new Error(
35+
"codePath, ques_name, timeout, sizeout, and language are required"
36+
);
3537
}
3638

3739
let code;
@@ -40,13 +42,23 @@ export async function judge({
4042
code = fs.readFileSync(codePath, "utf-8");
4143

4244
// Security checks
43-
if (code.includes("fopen") || code.includes("system") || code.includes("fork")) {
45+
if (
46+
code.includes("fopen") ||
47+
code.includes("system") ||
48+
code.includes("fork")
49+
) {
4450
throw new Error("Potentially dangerous code detected.");
4551
}
4652

4753
// Split inputs & outputs
48-
const inputParts = input.split("###").map((s) => s.trim()).filter(Boolean);
49-
const outputParts = output.split("###").map((s) => s.trim()).filter(Boolean);
54+
const inputParts = input
55+
.split("###")
56+
.map((s) => s.trim())
57+
.filter(Boolean);
58+
const outputParts = output
59+
.split("###")
60+
.map((s) => s.trim())
61+
.filter(Boolean);
5062

5163
if (inputParts.length !== outputParts.length) {
5264
throw new Error("Number of inputs and outputs do not match!");
@@ -64,7 +76,7 @@ export async function judge({
6476

6577
// At least 150 testcases per worker
6678
const totalWorkers = worker_running.length;
67-
let minPerWorker = 150;
79+
let minPerWorker = 100;
6880
const workerTaskMap = {};
6981

7082
if (testcases.length <= minPerWorker) {
@@ -74,7 +86,9 @@ export async function judge({
7486
// Distribute evenly while ensuring each worker gets at least 150 testcases
7587
let idx = 0;
7688
for (let tc of testcases) {
77-
const workerId = `worker_${Math.floor(idx / minPerWorker) % totalWorkers}`;
89+
const workerId = `worker_${
90+
Math.floor(idx / minPerWorker) % totalWorkers
91+
}`;
7892
if (!workerTaskMap[workerId]) workerTaskMap[workerId] = [];
7993
workerTaskMap[workerId].push(tc);
8094
idx++;
@@ -100,17 +114,15 @@ export async function judge({
100114

101115
// Track completion
102116
const waitUntilCompleted = async () => {
103-
const POLL_INTERVAL = 500;
104-
const MAX_ATTEMPTS = 60;
117+
const POLL_INTERVAL = 500; // 0.5 sec
118+
const MAX_ATTEMPTS = 60; // 30 sec total
105119
let attempts = 0;
106120

107121
while (attempts < MAX_ATTEMPTS) {
108122
const status = await redis.hGetAll(`job:${ques_name}:status`);
109-
const completed = Object.keys(status).filter((k) => status[k] === "completed");
110-
111-
// If all assigned workers are done
112-
if (completed.length === Object.keys(workerTaskMap).length) return true;
113-
123+
if (Object.keys(status).length === totalWorkerCount) {
124+
return true;
125+
}
114126
await new Promise((res) => setTimeout(res, POLL_INTERVAL));
115127
attempts++;
116128
}
@@ -124,9 +136,11 @@ export async function judge({
124136

125137
// Collect results
126138
const results = [];
127-
for (const workerId of Object.keys(workerTaskMap)) {
139+
for (const workerId of Object.keys(status)) {
128140
const data = await redis.get(`job:${ques_name}:worker:${workerId}`);
129-
if (data) results.push(...JSON.parse(data));
141+
if (data) {
142+
results.push(...JSON.parse(data));
143+
}
130144
}
131145

132146
return {
@@ -143,4 +157,3 @@ export async function judge({
143157
}
144158
}
145159
}
146-

worker-ops/judge-workers.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ spec:
1616
containers:
1717
- name: judge-worker
1818
image: lightningsagar/worker:3af46f0471137f42a003c12bdfc6a02f929386b4
19+
imagePullPolicy: Always
1920
ports:
2021
- containerPort: 5000
2122
env:

0 commit comments

Comments
 (0)