Skip to content

Commit dda2a60

Browse files
updated the docker image
1 parent d0f87d4 commit dda2a60

File tree

1 file changed

+80
-111
lines changed

1 file changed

+80
-111
lines changed
Lines changed: 80 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// worker-updated.js
12
import express from "express";
23
import fs from "fs";
34
import path from "path";
@@ -10,7 +11,6 @@ import { connectredis } from "./redis/redis.js";
1011

1112
const app = express();
1213

13-
// Allowed origins
1414
const allowedOrigins = [
1515
"https://judge-lib-mg91.vercel.app",
1616
"https://judge-lib-mg91.vercel.app/npm",
@@ -37,223 +37,192 @@ app.use(express.json());
3737
const __filename = fileURLToPath(import.meta.url);
3838
const __dirname = path.dirname(__filename);
3939

40-
// Unique worker ID
40+
// Unique worker/pod name for logging
4141
const WORKER_FIELD =
4242
process.env.WORKER_FIELD?.trim() ||
4343
`worker_${os.hostname()}_${Math.floor(Date.now() / 1000)}`;
4444

45-
console.log("Worker ID:", WORKER_FIELD);
45+
console.log("Resolved Worker ID at startup:", WORKER_FIELD);
4646

4747
const port = 5000;
4848
const redis_server = await connectredis();
4949

50-
// -----------------------------------
51-
// Code Compilation Logic
52-
// -----------------------------------
50+
// ---------- helpers ----------
5351
async function compileCode(language, codePath, execPath) {
5452
return new Promise((resolve, reject) => {
5553
if (language === "cpp") {
56-
exec(
57-
`g++ "${codePath}" -o "${execPath}"`,
58-
{ timeout: 10000 },
59-
(err, _, stderr) => {
60-
if (err) return reject("C++ Compilation Error:\n" + stderr);
61-
resolve();
62-
}
63-
);
54+
exec(`g++ "${codePath}" -o "${execPath}"`, { timeout: 10000 }, (err, _, stderr) => {
55+
if (err) return reject("C++ Compilation Error:\n" + stderr);
56+
resolve();
57+
});
6458
} else if (language === "java") {
6559
exec(`javac "${codePath}"`, { timeout: 10000 }, (err, _, stderr) => {
6660
if (err) return reject("Java Compilation Error:\n" + stderr);
6761
resolve();
6862
});
6963
} else {
70-
resolve(); // Python or interpreted languages don't need compilation
64+
// python/no-compile languages
65+
resolve();
7166
}
7267
});
7368
}
7469

75-
// -----------------------------------
76-
// Testcase Execution Logic
77-
// -----------------------------------
78-
function runTestcase(
79-
language,
80-
execPath,
81-
input,
82-
expected_output,
83-
timeoutSec,
84-
ques_name
85-
) {
70+
function runTestcase(language, execPath, input, expected_output, timeoutSec, ques_name) {
8671
return new Promise((resolve) => {
8772
const timeoutMs = timeoutSec * 1000;
8873
let run;
89-
9074
try {
9175
if (language === "cpp") {
9276
run = spawn(execPath, [], { stdio: ["pipe", "pipe", "pipe"] });
9377
} else if (language === "java") {
94-
run = spawn("java", ["Main"], {
95-
cwd: execPath,
96-
stdio: ["pipe", "pipe", "pipe"],
97-
});
78+
run = spawn("java", ["Main"], { cwd: execPath, stdio: ["pipe", "pipe", "pipe"] });
9879
} else if (language === "python" || language === "py") {
9980
const pythonCmd = process.platform === "win32" ? "python" : "python3";
10081
run = spawn(pythonCmd, [execPath], { stdio: ["pipe", "pipe", "pipe"] });
10182
} else {
102-
return resolve({
103-
input,
104-
expected_output,
105-
result: `Unsupported language: ${language}`,
106-
correct: false,
107-
});
83+
return resolve({ input, expected_output, result: `Unsupported language: ${language}`, correct: false });
10884
}
10985
} catch (err) {
110-
return resolve({
111-
input,
112-
expected_output,
113-
result: `Failed to spawn process for ${language}: ${err.message}`,
114-
correct: false,
115-
});
86+
return resolve({ input, expected_output, result: `Failed to spawn: ${err.message}`, correct: false });
11687
}
11788

11889
let result = "";
11990
let errorOutput = "";
91+
run.stdout.on("data", (d) => (result += d.toString()));
92+
run.stderr.on("data", (d) => (errorOutput += d.toString()));
12093

121-
run.stdout.on("data", (data) => (result += data.toString()));
122-
run.stderr.on("data", (data) => (errorOutput += data.toString()));
123-
124-
const timer = setTimeout(() => run.kill("SIGKILL"), timeoutMs);
94+
const timer = setTimeout(() => {
95+
try { run.kill("SIGKILL"); } catch (_) {}
96+
}, timeoutMs);
12597

12698
run.stdin.write(input.replace(/\r\n/g, "\n").trim() + "\n");
12799
run.stdin.end();
128100

129101
run.on("close", (code) => {
130102
clearTimeout(timer);
131103
let correct = false;
132-
133-
if (code === 0 && expected_output) {
104+
if (code === 0 && expected_output !== undefined && expected_output !== null) {
134105
correct = result.trim() === expected_output.trim();
135106
} else if (code === null) {
136107
result = `Timeout exceeded (${timeoutMs}ms)`;
137-
} else {
108+
} else if (code !== 0) {
138109
result = `Runtime error (exit code ${code})\n${errorOutput}`;
139110
}
140-
141111
resolve({ input, expected_output, result, correct, timeout: timeoutSec });
142112
});
143113
});
144114
}
145115

146-
// -----------------------------------
147-
// Process a single job
148-
// -----------------------------------
149-
async function processJob(jobKey, code, language, testcases) {
150-
// Extract ques_name from jobKey for file naming
151-
const ques_name = jobKey.split(':')[1] || 'unknown';
152-
153-
const extension =
154-
language === "cpp" ? "cpp" : language === "java" ? "java" : "py";
155-
const fileName = `${ques_name}_${WORKER_FIELD}_${Date.now()}.${extension}`;
116+
// processJob now accepts the assignedWorkerId (the slot that judge created)
117+
async function processJob(ques_name, code, language, testcases, assignedWorkerId) {
118+
const extension = language === "cpp" ? "cpp" : language === "java" ? "java" : "py";
119+
const fileName = `${ques_name.replace(/\s+/g, "_")}_${WORKER_FIELD}_${Date.now()}.${extension}`;
156120
const filePath = path.join(__dirname, fileName);
157-
const execPath =
158-
language === "java"
159-
? __dirname
160-
: filePath.replace(/\.\w+$/, language === "cpp" ? ".exe" : ".py");
121+
const execPath = language === "java" ? __dirname : filePath.replace(/\.\w+$/, language === "cpp" ? ".exe" : ".py");
161122

162123
fs.writeFileSync(filePath, code);
163124

164125
try {
165126
await compileCode(language, filePath, execPath);
166127

167-
// Run all testcases
128+
// Run all testcases (parallel for speed)
168129
const results = await Promise.all(
169130
testcases.map((tc) =>
170-
runTestcase(
171-
language,
172-
execPath,
173-
tc.input,
174-
tc.expected_output,
175-
tc.timeout,
176-
ques_name
177-
)
131+
runTestcase(language, execPath, tc.input, tc.expected_output, tc.timeout || 2.5, ques_name)
178132
)
179133
);
180134

181-
// FIXED: Store results with correct Redis key format
135+
// Save results to a deterministic key that judge will read:
136+
// job:<ques_name>:worker:<assignedWorkerId>:results
182137
await redis_server.setEx(
183-
`${jobKey}:results`,
184-
60,
138+
`job:${ques_name}:worker:${assignedWorkerId}:results`,
139+
120,
185140
JSON.stringify(results)
186141
);
187-
console.log("Job results stored:", results);
142+
143+
// Mark status for that worker slot. Value contains which pod actually ran it.
188144
await redis_server.hSet(`job:${ques_name}:status`, {
189-
[WORKER_FIELD]: "completed",
145+
[assignedWorkerId]: WORKER_FIELD,
190146
});
191-
await redis_server.expire(`job:${ques_name}:status`, 30);
147+
await redis_server.expire(`job:${ques_name}:status`, 120);
148+
149+
console.log(`Completed job ${ques_name} slot ${assignedWorkerId} by pod ${WORKER_FIELD}`);
192150
} catch (err) {
193151
console.error("Error during job processing:", err);
152+
194153
await redis_server.setEx(
195-
`${jobKey}:results`,
196-
60,
154+
`job:${ques_name}:worker:${assignedWorkerId}:results`,
155+
120,
197156
JSON.stringify([{ error: err.toString() }])
198157
);
199158
await redis_server.hSet(`job:${ques_name}:status`, {
200-
[WORKER_FIELD]: "completed",
159+
[assignedWorkerId]: WORKER_FIELD,
201160
});
202161
} finally {
203-
try {
204-
fs.unlinkSync(filePath);
205-
} catch {}
162+
// cleanup
163+
try { fs.unlinkSync(filePath); } catch (_) {}
206164
try {
207165
if (language === "cpp") fs.unlinkSync(execPath);
208-
if (language === "java")
209-
fs.unlinkSync(filePath.replace(".java", ".class"));
210-
} catch {}
166+
if (language === "java") fs.unlinkSync(filePath.replace(".java", ".class"));
167+
} catch (_) {}
211168
}
212169
}
213170

214-
// -----------------------------------
215-
// Polling loop to get jobs
216-
// -----------------------------------
171+
// ---------- main queue loop ----------
217172
async function pollForJobs() {
218173
while (true) {
219174
try {
220-
// FIXED: Get the Redis key directly
221-
const result = await redis_server.brPop("job_queue", 0);
222-
const jobKey = result.element;
223-
224-
console.log(`Got job: ${jobKey} by ${WORKER_FIELD}`);
225-
226-
// FIXED: Fetch job details using the correct Redis key
227-
const code = await redis_server.hGet(jobKey, "code");
228-
const language = await redis_server.hGet(jobKey, "language");
229-
const data_testcases = await redis_server.hGet(jobKey, "testcases");
175+
// brPop returns object like { key, element } where element is the pushed string
176+
const br = await redis_server.brPop("job_queue", 0);
177+
if (!br || !br.element) continue;
178+
179+
let payload;
180+
try {
181+
payload = JSON.parse(br.element);
182+
} catch (err) {
183+
console.warn("Queue item not JSON, treating as simple jobId:", br.element);
184+
// If old-style push (just ques_name) is used, let any worker handle all testcases:
185+
payload = { ques_name: br.element, workerId: null };
186+
}
187+
188+
const { ques_name, workerId } = payload;
189+
console.log(`Got job payload: ${JSON.stringify(payload)} by pod ${WORKER_FIELD}`);
190+
191+
// If judge pushed per-worker payloads (with workerId), look in that worker-specific redis hash
192+
// else fall back to single-job key `job:<ques_name>`
193+
const redisKey = workerId ? `job:${ques_name}:worker:${workerId}` : `job:${ques_name}`;
194+
195+
// job was stored using hSet(redisKey, { code, language, testcases })
196+
const code = await redis_server.hGet(redisKey, "code");
197+
const language = await redis_server.hGet(redisKey, "language");
198+
const data_testcases = await redis_server.hGet(redisKey, "testcases");
230199

231200
if (!data_testcases) {
232-
console.warn(`No testcases found for job ${jobKey}`);
201+
console.warn(`No testcases found for job payload ${JSON.stringify(payload)}`);
202+
// continue to next queue item
233203
continue;
234204
}
235205

236206
const testcases = JSON.parse(data_testcases);
237-
console.log(`Processing ${testcases.length} testcases for job ${jobKey}`);
238207

239-
// Process all testcases for this job at once
240-
await processJob(jobKey, code, language, testcases);
208+
// Process this worker-slot (if workerId present, pass it; else process whole job)
209+
const assignedWorkerId = workerId || WORKER_FIELD;
210+
await processJob(ques_name, code, language, testcases, assignedWorkerId);
241211
} catch (err) {
242212
console.error("Error while polling job:", err);
213+
// small backoff to avoid tight error loop
214+
await new Promise((r) => setTimeout(r, 500));
243215
}
244216
}
245217
}
246218

247219
pollForJobs();
248220

249-
// -----------------------------------
250-
// Health check endpoint
251-
// -----------------------------------
221+
// health endpoint
252222
app.get("/ping", (req, res) => {
253-
console.log("Ping received at", new Date().toISOString());
254223
res.send(`Worker ${WORKER_FIELD} is awake`);
255224
});
256225

257226
app.listen(port, () => {
258227
console.log(`${WORKER_FIELD} running at port ${port}`);
259-
});
228+
});

0 commit comments

Comments
 (0)