11use std:: sync:: Arc ;
22
33use crate :: ops:: { DbPool , RunnerOperations } ;
4- use crate :: runtime :: Worker ;
5- use crate :: store :: { self , Binding , bindings_to_infos } ;
4+ use crate :: store :: { self , WorkerWithBindings } ;
5+ use crate :: worker :: { create_worker , prepare_script } ;
66use crate :: worker_pool:: WORKER_POOL ;
77
8- use openworkers_core:: { RuntimeLimits , ScheduledInit , Script , Task , WorkerCode } ;
8+ use openworkers_core:: { RuntimeLimits , ScheduledInit , Task , TerminationReason } ;
99
1010use serde:: Deserialize ;
1111use serde:: Serialize ;
@@ -21,11 +21,19 @@ pub struct ScheduledData {
2121
2222fn run_scheduled (
2323 data : ScheduledData ,
24- script : Script ,
25- bindings : Vec < Binding > ,
24+ worker_data : WorkerWithBindings ,
2625 db_pool : DbPool ,
2726 global_log_tx : std:: sync:: mpsc:: Sender < crate :: log:: LogMessage > ,
2827) {
28+ // Parse script before spawning (fail fast)
29+ let script = match prepare_script ( & worker_data) {
30+ Ok ( s) => s,
31+ Err ( err) => {
32+ log:: error!( "Failed to prepare script for scheduled task: {err:?}" ) ;
33+ return ;
34+ }
35+ } ;
36+
2937 // Try to acquire a worker slot
3038 let permit = match crate :: worker_pool:: WORKER_SEMAPHORE
3139 . clone ( )
@@ -41,7 +49,9 @@ fn run_scheduled(
4149 }
4250 } ;
4351
44- let worker_id = data. worker_id . clone ( ) ;
52+ let worker_id = worker_data. id . clone ( ) ;
53+ let bindings = worker_data. bindings . clone ( ) ;
54+ let code_type = worker_data. code_type . clone ( ) ;
4555 let ( log_tx, log_handler) = crate :: log:: create_log_handler ( worker_id. clone ( ) , global_log_tx) ;
4656
4757 // Use the sequential worker pool - ensures ONE V8 isolate per thread at a time
@@ -65,10 +75,11 @@ fn run_scheduled(
6575 . with_db_pool ( db_pool) ,
6676 ) ;
6777
68- let mut worker = match Worker :: new_with_ops ( script, Some ( limits) , ops) . await {
69- Ok ( worker) => worker,
78+ // Create worker
79+ let mut worker = match create_worker ( script, limits, ops, & code_type) . await {
80+ Ok ( w) => w,
7081 Err ( err) => {
71- log:: error!( "failed to create scheduled worker: {err}" ) ;
82+ log:: error!( "failed to create scheduled worker: {err:? }" ) ;
7283 log_handler. flush ( ) ;
7384 return ;
7485 }
@@ -79,6 +90,7 @@ fn run_scheduled(
7990 let task = Task :: Scheduled ( Some ( ScheduledInit :: new ( res_tx, data. scheduled_time ) ) ) ;
8091
8192 log:: debug!( "exec scheduled task" ) ;
93+
8294 match worker. exec ( task) . await {
8395 Ok ( ( ) ) => {
8496 log:: debug!( "scheduled task completed successfully" ) ;
@@ -88,27 +100,23 @@ fn run_scheduled(
88100 Err ( err) => log:: error!( "scheduled task response error: {err}" ) ,
89101 }
90102 }
91- Err ( reason) => {
92- use openworkers_core:: TerminationReason ;
93-
94- match reason {
95- TerminationReason :: CpuTimeLimit => {
96- log:: warn!( "scheduled task terminated: CPU time limit exceeded" ) ;
97- }
98- TerminationReason :: WallClockTimeout => {
99- log:: warn!( "scheduled task terminated: wall-clock timeout" ) ;
100- }
101- TerminationReason :: MemoryLimit => {
102- log:: warn!( "scheduled task terminated: memory limit exceeded" ) ;
103- }
104- TerminationReason :: Exception ( msg) => {
105- log:: error!( "scheduled task terminated: uncaught exception: {}" , msg) ;
106- }
107- _ => {
108- log:: error!( "scheduled task terminated: {:?}" , reason) ;
109- }
103+ Err ( reason) => match reason {
104+ TerminationReason :: CpuTimeLimit => {
105+ log:: warn!( "scheduled task terminated: CPU time limit exceeded" ) ;
110106 }
111- }
107+ TerminationReason :: WallClockTimeout => {
108+ log:: warn!( "scheduled task terminated: wall-clock timeout" ) ;
109+ }
110+ TerminationReason :: MemoryLimit => {
111+ log:: warn!( "scheduled task terminated: memory limit exceeded" ) ;
112+ }
113+ TerminationReason :: Exception ( msg) => {
114+ log:: error!( "scheduled task terminated: uncaught exception: {}" , msg) ;
115+ }
116+ _ => {
117+ log:: error!( "scheduled task terminated: {:?}" , reason) ;
118+ }
119+ } ,
112120 }
113121
114122 // CRITICAL: Flush logs before worker is dropped to prevent log loss
@@ -165,44 +173,16 @@ pub fn handle_scheduled(
165173 log:: debug!( "scheduled task parsed: {:?}" , data) ;
166174
167175 let worker_id = store:: WorkerIdentifier :: Id ( data. worker_id . clone ( ) ) ;
168- let worker = match store:: get_worker_with_bindings ( & mut conn, worker_id) . await {
169- Some ( worker) => worker,
176+ let worker_data = match store:: get_worker_with_bindings ( & mut conn, worker_id) . await
177+ {
178+ Some ( w) => w,
170179 None => {
171180 log:: error!( "worker not found: {:?}" , data. worker_id) ;
172181 continue ;
173182 }
174183 } ;
175184
176- let code =
177- match crate :: transform:: parse_worker_code_str ( & worker. script , & worker. language )
178- {
179- Ok ( code) => WorkerCode :: js ( code) ,
180- Err ( e) => {
181- log:: error!( "Failed to parse worker code for scheduled task: {}" , e) ;
182- continue ;
183- }
184- } ;
185-
186- // Convert bindings to BindingInfo for Script (names + types only)
187- let binding_infos = bindings_to_infos ( & worker. bindings ) ;
188-
189- let script = Script {
190- code,
191- env : if worker. env . is_empty ( ) {
192- None
193- } else {
194- Some ( worker. env . clone ( ) )
195- } ,
196- bindings : binding_infos,
197- } ;
198-
199- run_scheduled (
200- data,
201- script,
202- worker. bindings ,
203- db. clone ( ) ,
204- global_log_tx. clone ( ) ,
205- ) ;
185+ run_scheduled ( data, worker_data, db. clone ( ) , global_log_tx. clone ( ) ) ;
206186 }
207187
208188 log:: debug!( "scheduled task listener stopped" ) ;
0 commit comments