1- // worker-updated.js
21import express from "express" ;
32import fs from "fs" ;
43import path from "path" ;
54import { exec , spawn } from "child_process" ;
65import { fileURLToPath } from "url" ;
7- import os from "os " ;
6+ import fetch from "node-fetch " ;
87import "dotenv/config" ;
9- import cors from "cors" ;
108import { connectredis } from "./redis/redis.js" ;
9+ import cors from "cors" ;
1110
1211const app = express ( ) ;
13-
1412const allowedOrigins = [
1513 "https://judge-lib-mg91.vercel.app" ,
1614 "https://judge-lib-mg91.vercel.app/npm" ,
@@ -31,26 +29,17 @@ app.use(
3129 allowedHeaders : [ "Content-Type" ] ,
3230 } )
3331) ;
34-
3532app . use ( express . json ( ) ) ;
3633
3734const __filename = fileURLToPath ( import . meta. url ) ;
3835const __dirname = path . dirname ( __filename ) ;
39-
40- // Unique worker/pod name for logging
41- const WORKER_FIELD =
42- process . env . WORKER_FIELD ?. trim ( ) ||
43- `worker_${ os . hostname ( ) } _${ Math . floor ( Date . now ( ) / 1000 ) } ` ;
44-
45- console . log ( "Resolved Worker ID at startup:" , WORKER_FIELD ) ;
46-
36+ const WORKER_FIELD = "worker_0" ;
4737const port = 5000 ;
4838const redis_server = await connectredis ( ) ;
4939
50- // ---------- helpers ----------
5140async function compileCode ( language , codePath , execPath ) {
52- return new Promise ( ( resolve , reject ) => {
53- if ( language === "cpp" ) {
41+ if ( language === "cpp" ) {
42+ return new Promise ( ( resolve , reject ) => {
5443 exec (
5544 `g++ "${ codePath } " -o "${ execPath } "` ,
5645 { timeout : 10000 } ,
@@ -59,183 +48,185 @@ async function compileCode(language, codePath, execPath) {
5948 resolve ( ) ;
6049 }
6150 ) ;
62- } else if ( language === "java" ) {
51+ } ) ;
52+ } else if ( language === "java" ) {
53+ return new Promise ( ( resolve , reject ) => {
6354 exec ( `javac "${ codePath } "` , { timeout : 10000 } , ( err , _ , stderr ) => {
6455 if ( err ) return reject ( "Java Compilation Error:\n" + stderr ) ;
6556 resolve ( ) ;
6657 } ) ;
67- } else {
68- // python/no-compile languages
69- resolve ( ) ;
70- }
71- } ) ;
58+ } ) ;
59+ }
60+ return Promise . resolve ( ) ;
7261}
7362
74- function runTestcase ( language , execPath , input , expected_output , timeoutSec , ques_name ) {
63+ function runTestcase (
64+ language ,
65+ execPath ,
66+ input ,
67+ expected_output ,
68+ timeoutSec ,
69+ ques_name
70+ ) {
7571 return new Promise ( ( resolve ) => {
72+ // if (timeoutSec > 2.5) timeoutSec = 2.5;
7673 const timeoutMs = timeoutSec * 1000 ;
74+
7775 let run ;
7876 try {
7977 if ( language === "cpp" ) {
8078 run = spawn ( execPath , [ ] , { stdio : [ "pipe" , "pipe" , "pipe" ] } ) ;
8179 } else if ( language === "java" ) {
82- // execPath should be the directory containing .class files
83- run = spawn ( "java" , [ "Main" ] , { cwd : execPath , stdio : [ "pipe" , "pipe" , "pipe" ] } ) ;
80+ run = spawn ( "java" , [ "Main" ] , {
81+ cwd : execPath ,
82+ stdio : [ "pipe" , "pipe" , "pipe" ] ,
83+ } ) ;
8484 } else if ( language === "python" || language === "py" ) {
8585 const pythonCmd = process . platform === "win32" ? "python" : "python3" ;
86- run = spawn ( pythonCmd , [ execPath ] , { stdio : [ "pipe" , "pipe" , "pipe" ] } ) ;
86+ run = spawn ( pythonCmd , [ execPath ] , {
87+ stdio : [ "pipe" , "pipe" , "pipe" ] ,
88+ } ) ;
8789 } else {
88- return resolve ( { input, expected_output, result : `Unsupported language: ${ language } ` , correct : false } ) ;
90+ return resolve ( {
91+ ques_name,
92+ input,
93+ expected_output,
94+ result : `Unsupported language: ${ language } ` ,
95+ correct : false ,
96+ } ) ;
8997 }
9098 } catch ( err ) {
91- return resolve ( { input, expected_output, result : `Failed to spawn: ${ err . message } ` , correct : false } ) ;
99+ return resolve ( {
100+ ques_name,
101+ input,
102+ expected_output,
103+ result : `Failed to spawn process for ${ language } : ${ err . message } ` ,
104+ correct : false ,
105+ } ) ;
106+ }
107+
108+ if ( ! run || ! run . stdout ) {
109+ return resolve ( {
110+ input,
111+ expected_output,
112+ result : `Failed to execute process for ${ language } ` ,
113+ correct : false ,
114+ } ) ;
92115 }
93116
94117 let result = "" ;
95118 let errorOutput = "" ;
96- run . stdout . on ( "data" , ( d ) => ( result += d . toString ( ) ) ) ;
97- run . stderr . on ( "data" , ( d ) => ( errorOutput += d . toString ( ) ) ) ;
119+ run . stdout . on ( "data" , ( data ) => ( result += data . toString ( ) ) ) ;
120+ run . stderr . on ( "data" , ( data ) => ( errorOutput += data . toString ( ) ) ) ;
98121
99- const timer = setTimeout ( ( ) => {
100- try { run . kill ( "SIGKILL" ) ; } catch ( _ ) { }
101- } , timeoutMs ) ;
122+ const timer = setTimeout ( ( ) => run . kill ( "SIGKILL" ) , timeoutMs ) ;
102123
103124 run . stdin . write ( input . replace ( / \r \n / g, "\n" ) . trim ( ) + "\n" ) ;
104125 run . stdin . end ( ) ;
105126
106127 run . on ( "close" , ( code ) => {
107128 clearTimeout ( timer ) ;
108129 let correct = false ;
109- if ( code === 0 && expected_output !== undefined && expected_output !== null ) {
130+ console . log ( expected_output ) ;
131+ console . log ( code , expected_output , result . trim ( ) ) ;
132+ if ( code === 0 && expected_output ) {
110133 correct = result . trim ( ) === expected_output . trim ( ) ;
111134 } else if ( code === null ) {
112135 result = `Timeout exceeded (${ timeoutMs } ms)` ;
113- } else if ( code !== 0 ) {
136+ } else {
114137 result = `Runtime error (exit code ${ code } )\n${ errorOutput } ` ;
115138 }
139+
116140 resolve ( { input, expected_output, result, correct, timeout : timeoutSec } ) ;
117141 } ) ;
118142 } ) ;
119143}
120144
121- // processJob now accepts the assignedWorkerId (the slot that judge created)
122- async function processJob ( ques_name , code , language , testcases , assignedWorkerId ) {
123- const extension = language === "cpp" ? "cpp" : language === "java" ? "java" : "py" ;
124- const fileName = `${ ques_name . replace ( / \s + / g, "_" ) } _${ WORKER_FIELD } _${ Date . now ( ) } .${ extension } ` ;
145+ async function processJob ( ques_name , code , language , testcases ) {
146+ const extension =
147+ language === "cpp" ? "cpp" : language === "java" ? "java" : "py" ;
148+ // const fileName = `${ques_name}_${WORKER_FIELD}.${extension}`;
149+ const fileName = `${ ques_name } _${ WORKER_FIELD } _${ Date . now ( ) } .${ extension } ` ;
150+
125151 const filePath = path . join ( __dirname , fileName ) ;
126- const execPath = language === "java" ? __dirname : filePath . replace ( / \. \w + $ / , language === "cpp" ? ".exe" : ".py" ) ;
152+ const execPath =
153+ language === "java"
154+ ? __dirname
155+ : filePath . replace ( / \. \w + $ / , language === "cpp" ? ".exe" : ".py" ) ;
127156
128157 fs . writeFileSync ( filePath , code ) ;
129158
130159 try {
131160 await compileCode ( language , filePath , execPath ) ;
132161
133- // Run all testcases (parallel for speed)
134162 const results = await Promise . all (
135163 testcases . map ( ( tc ) =>
136- runTestcase ( language , execPath , tc . input , tc . expected_output , tc . timeout || 2.5 , ques_name )
164+ runTestcase (
165+ language ,
166+ execPath ,
167+ tc . input ,
168+ tc . expected_output ,
169+ tc . timeout ,
170+ ques_name
171+ )
137172 )
138173 ) ;
139-
140- // Save results to a deterministic key that judge will read:
141- // job:<ques_name>:worker:<assignedWorkerId>:results
174+ console . log ( results , "this is results" ) ;
142175 await redis_server . setEx (
143- `job:${ ques_name } :worker:${ assignedWorkerId } :results ` ,
144- 120 ,
176+ `job:${ ques_name } :worker:${ WORKER_FIELD } ` ,
177+ 30 ,
145178 JSON . stringify ( results )
146179 ) ;
147-
148- // Mark status for that worker slot. Value contains which pod actually ran it.
149180 await redis_server . hSet ( `job:${ ques_name } :status` , {
150- [ assignedWorkerId ] : WORKER_FIELD ,
181+ [ WORKER_FIELD ] : "completed" ,
151182 } ) ;
152- await redis_server . expire ( `job:${ ques_name } :status` , 120 ) ;
153-
154- console . log ( `Completed job ${ ques_name } slot ${ assignedWorkerId } by pod ${ WORKER_FIELD } ` ) ;
183+ await redis_server . expire ( `job:${ ques_name } :status` , 30 ) ;
155184 } catch ( err ) {
156185 console . error ( "Error during job processing:" , err ) ;
157-
158186 await redis_server . setEx (
159- `job:${ ques_name } :worker:${ assignedWorkerId } :results ` ,
160- 120 ,
187+ `job:${ ques_name } :worker:${ WORKER_FIELD } ` ,
188+ 30 ,
161189 JSON . stringify ( [ { error : err . toString ( ) } ] )
162190 ) ;
163191 await redis_server . hSet ( `job:${ ques_name } :status` , {
164- [ assignedWorkerId ] : WORKER_FIELD ,
192+ [ WORKER_FIELD ] : "completed" ,
165193 } ) ;
166194 } finally {
167- // cleanup
168- try { fs . unlinkSync ( filePath ) ; } catch ( _ ) { }
195+ try {
196+ fs . unlinkSync ( filePath ) ;
197+ } catch { }
169198 try {
170199 if ( language === "cpp" ) fs . unlinkSync ( execPath ) ;
171- if ( language === "java" ) fs . unlinkSync ( filePath . replace ( ".java" , ".class" ) ) ;
172- } catch ( _ ) { }
200+ if ( language === "java" )
201+ fs . unlinkSync ( filePath . replace ( ".java" , ".class" ) ) ;
202+ } catch { }
173203 }
174204}
175205
176- // ---------- main queue loop ----------
177206async function pollForJobs ( ) {
178207 while ( true ) {
179208 try {
180- // brPop returns object like { key, element } where element is the pushed string or Buffer
181- const br = await redis_server . brPop ( "job_queue" , 0 ) ;
182- if ( ! br || ! br . element ) continue ;
183-
184- // --- robust parse of queue item ---
185- const raw = br . element ;
186- const elemStr = Buffer . isBuffer ( raw ) ? raw . toString ( ) : String ( raw ) . trim ( ) ;
187-
188- let payload ;
189- try {
190- payload = JSON . parse ( elemStr ) ;
191- } catch ( err ) {
192- // fallback to legacy/raw redis-key form
193- console . warn ( "Queue item not JSON, treating as simple jobId:" , elemStr ) ;
194- // if judge previously pushed raw redisKey, use that as redisKey as well
195- payload = { ques_name : elemStr , workerId : null , redisKey : elemStr } ;
196- }
197-
198- // Prefer payload.redisKey if provided by judge
199- const { ques_name, workerId, redisKey : rkFromPayload } = payload ;
200- const redisKey = rkFromPayload
201- ? rkFromPayload
202- : workerId
203- ? `job:${ ques_name } :worker:${ workerId } `
204- : `job:${ ques_name } ` ;
205-
206- console . log ( `Got job payload: ${ JSON . stringify ( payload ) } (using redisKey=${ redisKey } ) by pod ${ WORKER_FIELD } ` ) ;
207-
208- // job was stored using hSet(redisKey, { code, language, testcases })
209- const code = await redis_server . hGet ( redisKey , "code" ) ;
210- const language = await redis_server . hGet ( redisKey , "language" ) ;
211- const data_testcases = await redis_server . hGet ( redisKey , "testcases" ) ;
212-
213- if ( ! data_testcases ) {
214- console . warn ( `No testcases found for redisKey=${ redisKey } payload=${ JSON . stringify ( payload ) } ` ) ;
215- // continue to next queue item
216- continue ;
217- }
209+ const { element : ques_name } = await redis_server . brPop ( "job_queue" , 0 ) ;
210+ console . log ( `Got job: ${ ques_name } ` ) ;
218211
212+ const code = await redis_server . hGet ( ques_name , "code" ) ;
213+ const language = await redis_server . hGet ( ques_name , "language" ) ;
214+ const data_testcases = await redis_server . hGet ( ques_name , WORKER_FIELD ) ;
215+ if ( ! data_testcases ) continue ;
219216 const testcases = JSON . parse ( data_testcases ) ;
220-
221- // Process this worker-slot (if workerId present, pass it; else process whole job)
222- const assignedWorkerId = workerId || WORKER_FIELD ;
223- await processJob ( ques_name , code , language , testcases , assignedWorkerId ) ;
217+ await processJob ( ques_name , code , language , testcases ) ;
224218 } catch ( err ) {
225219 console . error ( "Error while polling job:" , err ) ;
226- // small backoff to avoid tight error loop
227- await new Promise ( ( r ) => setTimeout ( r , 500 ) ) ;
228220 }
229221 }
230222}
231-
232223pollForJobs ( ) ;
233224
234- // health endpoint
235225app . get ( "/ping" , ( req , res ) => {
236- res . send ( `Worker ${ WORKER_FIELD } is awake` ) ;
226+ console . log ( "Ping received at" , new Date ( ) . toISOString ( ) ) ;
227+ res . send ( "Worker is awake" ) ;
237228} ) ;
238229
239230app . listen ( port , ( ) => {
240231 console . log ( `${ WORKER_FIELD } running at port ${ port } ` ) ;
241- } ) ;
232+ } ) ;
0 commit comments