@@ -4,7 +4,6 @@ import { connectredis } from "./redis.js";
44
55const redis = await connectredis ( ) ;
66
7- // This should be dynamically discovered later via Redis or service discovery
87const worker_running = [
98 "https://workers-judge.onrender.com/" ,
109 "https://workers-judge-1.onrender.com/" ,
@@ -13,13 +12,6 @@ const worker_running = [
1312
1413/**
1514 * Judge Function
16- * @param {string } codePath - Absolute path to uploaded code file
17- * @param {string } language - Programming language
18- * @param {string } ques_name - Unique job/question ID
19- * @param {string } input - Raw input test cases (separated by ###)
20- * @param {string } output - Raw expected outputs (separated by ###)
21- * @param {number } timeout - Max timeout per testcase in seconds
22- * @param {number } sizeout - Max output size in KB
2315 */
2416export async function judge ( {
2517 codePath,
@@ -38,64 +30,50 @@ export async function judge({
3830
3931 let code ;
4032 try {
41- // Read user code
33+ // 1. Read user code
4234 code = fs . readFileSync ( codePath , "utf-8" ) ;
4335
4436 // Security checks
45- if (
46- code . includes ( "fopen" ) ||
47- code . includes ( "system" ) ||
48- code . includes ( "fork" )
49- ) {
37+ if ( code . includes ( "fopen" ) || code . includes ( "system" ) || code . includes ( "fork" ) ) {
5038 throw new Error ( "Potentially dangerous code detected." ) ;
5139 }
5240
53- // Split inputs & outputs
54- const inputParts = input
55- . split ( "###" )
56- . map ( ( s ) => s . trim ( ) )
57- . filter ( Boolean ) ;
58- const outputParts = output
59- . split ( "###" )
60- . map ( ( s ) => s . trim ( ) )
61- . filter ( Boolean ) ;
41+ // 2. Split inputs & outputs
42+ const inputParts = input . split ( "###" ) . map ( s => s . trim ( ) ) . filter ( Boolean ) ;
43+ const outputParts = output . split ( "###" ) . map ( s => s . trim ( ) ) . filter ( Boolean ) ;
6244
6345 if ( inputParts . length !== outputParts . length ) {
6446 throw new Error ( "Number of inputs and outputs do not match!" ) ;
6547 }
6648
67- // Build testcases array
68- const testcases = inputParts . map ( ( input , i ) => ( {
69- input,
49+ // 3. Build testcases array
50+ const testcases = inputParts . map ( ( inp , i ) => ( {
51+ input : inp ,
7052 expected_output : outputParts [ i ] ,
7153 correct : null ,
7254 timeout : timeout || 2.5 ,
7355 sizeout,
7456 result : null ,
7557 } ) ) ;
7658
77- // At least 150 testcases per worker
7859 const totalWorkers = worker_running . length ;
79- let minPerWorker = 100 ;
60+ const minPerWorker = 100 ;
8061 const workerTaskMap = { } ;
8162
63+ // Distribute test cases among workers
8264 if ( testcases . length <= minPerWorker ) {
83- // All testcases go to a single worker
8465 workerTaskMap [ "worker_0" ] = testcases ;
8566 } else {
86- // Distribute evenly while ensuring each worker gets at least 150 testcases
8767 let idx = 0 ;
88- for ( let tc of testcases ) {
89- const workerId = `worker_${
90- Math . floor ( idx / minPerWorker ) % totalWorkers
91- } `;
68+ for ( const tc of testcases ) {
69+ const workerId = `worker_${ Math . floor ( idx / minPerWorker ) % totalWorkers } ` ;
9270 if ( ! workerTaskMap [ workerId ] ) workerTaskMap [ workerId ] = [ ] ;
9371 workerTaskMap [ workerId ] . push ( tc ) ;
9472 idx ++ ;
9573 }
9674 }
9775
98- // Store data in Redis for each worker
76+ // 4. Store data in Redis and push jobs to queue
9977 await Promise . all (
10078 Object . entries ( workerTaskMap ) . map ( async ( [ workerId , tcList ] ) => {
10179 const redisPayload = {
@@ -104,56 +82,62 @@ export async function judge({
10482 testcases : JSON . stringify ( tcList ) ,
10583 } ;
10684
107- // Each worker gets its own redis entry
108- await redis . hSet ( ques_name , redisPayload ) ;
85+ const redisKey = `job:${ ques_name } :worker:${ workerId } ` ;
10986
110- // Push job to queue
111- await redis . lPush ( "job_queue" , ques_name ) ;
87+ // Store test cases for this worker
88+ await redis . hSet ( redisKey , redisPayload ) ;
89+
90+ // Push job to queue - FIXED: Store the Redis key instead of JSON object
91+ await redis . lPush ( "job_queue" , redisKey ) ;
11292 } )
11393 ) ;
11494
115- // Track completion
95+ console . log ( `[INFO] Job ${ ques_name } queued for ${ Object . keys ( workerTaskMap ) . length } workers` ) ;
96+
97+ // 5. Wait until all workers complete
11698 const waitUntilCompleted = async ( ) => {
117- const POLL_INTERVAL = 500 ; // 0.5 sec
118- const MAX_ATTEMPTS = 60 ; // 30 sec total
99+ const POLL_INTERVAL = 500 ; // ms
100+ const MAX_ATTEMPTS = 120 ; // Increased to 60 sec total
119101 let attempts = 0 ;
120102
121103 while ( attempts < MAX_ATTEMPTS ) {
122104 const status = await redis . hGetAll ( `job:${ ques_name } :status` ) ;
123- if ( Object . keys ( status ) . length === totalWorkerCount ) {
105+ if ( Object . keys ( status ) . length === Object . keys ( workerTaskMap ) . length ) {
124106 return true ;
125107 }
126- await new Promise ( ( res ) => setTimeout ( res , POLL_INTERVAL ) ) ;
108+ await new Promise ( res => setTimeout ( res , POLL_INTERVAL ) ) ;
127109 attempts ++ ;
128110 }
129111 return false ;
130112 } ;
131113
132114 const completed = await waitUntilCompleted ( ) ;
133115 if ( ! completed ) {
134- throw new Error ( " Timeout waiting for workers to finish" ) ;
116+ throw new Error ( ` Timeout waiting for workers to finish for job ${ ques_name } ` ) ;
135117 }
136118
137- // Collect results
119+ // 6. Collect results from all workers
138120 const results = [ ] ;
139- for ( const workerId of Object . keys ( status ) ) {
140- const data = await redis . get ( `job:${ ques_name } :worker:${ workerId } ` ) ;
121+ for ( const workerId of Object . keys ( workerTaskMap ) ) {
122+ const redisKey = `job:${ ques_name } :worker:${ workerId } :results` ;
123+ const data = await redis . get ( redisKey ) ;
141124 if ( data ) {
142125 results . push ( ...JSON . parse ( data ) ) ;
143126 }
144127 }
145128
129+ console . log ( `[SUCCESS] Completed job ${ ques_name } ` ) ;
146130 return {
147131 jobId : ques_name ,
148132 results,
149133 } ;
150134 } catch ( err ) {
151- console . error ( "Judge Error:" , err ) ;
135+ console . error ( "[ERROR] Judge Error:" , err ) ;
152136 throw err ;
153137 } finally {
154138 // Cleanup uploaded code file
155139 if ( fs . existsSync ( codePath ) ) {
156140 fs . unlinkSync ( codePath ) ;
157141 }
158142 }
159- }
143+ }
0 commit comments